-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #75 from xenserver-next/fix-MountingAccessor-write…
…File Test, cover and fix the method writeFile() of subclasses of xcp.accessor.MountingAccessor
- Loading branch information
Showing
3 changed files
with
238 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
from _typeshed import Incomplete # pylint: disable=import-error | ||
|
||
|
||
def set_uid(uid) -> None: ... | ||
def set_gid(gid) -> None: ... | ||
def reset_ids() -> None: ... | ||
def is_root(): ... | ||
|
||
|
||
class FakeFilesystem: | ||
path_separator: Incomplete | ||
alternative_path_separator: Incomplete | ||
patcher: Incomplete | ||
is_windows_fs: Incomplete | ||
is_macos: Incomplete | ||
is_case_sensitive: Incomplete | ||
root: Incomplete | ||
cwd: Incomplete | ||
umask: Incomplete | ||
open_files: Incomplete | ||
mount_points: Incomplete | ||
dev_null: Incomplete | ||
def __init__( | ||
self, | ||
path_separator=..., | ||
total_size: Incomplete | None = ..., | ||
patcher: Incomplete | None = ..., | ||
) -> None: ... | ||
@property | ||
def is_linux(self): ... | ||
def reset(self, total_size: Incomplete | None = ...) -> None: ... | ||
def pause(self) -> None: ... | ||
def resume(self) -> None: ... | ||
def line_separator(self): ... | ||
def raise_os_error( | ||
self, errno, filename: Incomplete | None = ..., winerror: Incomplete | None = ... | ||
) -> None: ... | ||
def raise_io_error(self, errno, filename: Incomplete | None = ...) -> None: ... | ||
def add_mount_point(self, path, total_size: Incomplete | None = ...): ... | ||
def get_disk_usage(self, path: Incomplete | None = ...): ... | ||
def set_disk_usage(self, total_size, path: Incomplete | None = ...) -> None: ... | ||
def change_disk_usage(self, usage_change, file_path, st_dev) -> None: ... | ||
def stat(self, entry_path, follow_symlinks: bool = ...): ... | ||
def chmod(self, path, mode, follow_symlinks: bool = ...) -> None: ... | ||
def utime( | ||
self, | ||
path, | ||
times: Incomplete | None = ..., | ||
ns: Incomplete | None = ..., | ||
follow_symlinks: bool = ..., | ||
) -> None: ... | ||
def get_open_file(self, file_des): ... | ||
def has_open_file(self, file_object): ... | ||
def normcase(self, path): ... | ||
def normpath(self, path): ... | ||
def absnormpath(self, path): ... | ||
def splitpath(self, path): ... | ||
def splitdrive(self, path): ... | ||
def joinpaths(self, *paths): ... | ||
def ends_with_path_separator(self, file_path): ... | ||
def is_filepath_ending_with_separator(self, path): ... | ||
def exists(self, file_path, check_link: bool = ...): ... | ||
def resolve_path(self, file_path, allow_fd: bool = ..., raw_io: bool = ...): ... | ||
def get_object_from_normpath(self, file_path, check_read_perm: bool = ...): ... | ||
def get_object(self, file_path, check_read_perm: bool = ...): ... | ||
def resolve( | ||
self, | ||
file_path, | ||
follow_symlinks: bool = ..., | ||
allow_fd: bool = ..., | ||
check_read_perm: bool = ..., | ||
): ... | ||
def lresolve(self, path): ... | ||
def add_object(self, file_path, file_object, error_fct: Incomplete | None = ...) -> None: ... | ||
def rename(self, old_file_path, new_file_path, force_replace: bool = ...) -> None: ... | ||
def remove_object(self, file_path) -> None: ... | ||
def make_string_path(self, path): ... | ||
def create_dir(self, directory_path, perm_bits=...): ... | ||
def create_file( | ||
self, | ||
file_path, | ||
st_mode=..., | ||
contents: str = ..., | ||
st_size: Incomplete | None = ..., | ||
create_missing_dirs: bool = ..., | ||
apply_umask: bool = ..., | ||
encoding: Incomplete | None = ..., | ||
errors: Incomplete | None = ..., | ||
side_effect: Incomplete | None = ..., | ||
): ... | ||
def add_real_file( | ||
self, source_path, read_only: bool = ..., target_path: Incomplete | None = ... | ||
): ... | ||
def add_real_symlink(self, source_path, target_path: Incomplete | None = ...): ... | ||
def add_real_directory( | ||
self, | ||
source_path, | ||
read_only: bool = ..., | ||
lazy_read: bool = ..., | ||
target_path: Incomplete | None = ..., | ||
): ... | ||
def add_real_paths( | ||
self, path_list, read_only: bool = ..., lazy_dir_read: bool = ... | ||
) -> None: ... | ||
def create_file_internally( | ||
self, | ||
file_path, | ||
st_mode=..., | ||
contents: str = ..., | ||
st_size: Incomplete | None = ..., | ||
create_missing_dirs: bool = ..., | ||
apply_umask: bool = ..., | ||
encoding: Incomplete | None = ..., | ||
errors: Incomplete | None = ..., | ||
read_from_real_fs: bool = ..., | ||
raw_io: bool = ..., | ||
side_effect: Incomplete | None = ..., | ||
): ... | ||
def create_symlink(self, file_path, link_target, create_missing_dirs: bool = ...): ... | ||
def link(self, old_path, new_path, follow_symlinks: bool = ...): ... | ||
def readlink(self, path): ... | ||
def makedir(self, dir_name, mode=...) -> None: ... | ||
def makedirs(self, dir_name, mode=..., exist_ok: bool = ...) -> None: ... | ||
def isdir(self, path, follow_symlinks: bool = ...): ... | ||
def isfile(self, path, follow_symlinks: bool = ...): ... | ||
def islink(self, path): ... | ||
def confirmdir(self, target_directory): ... | ||
def remove(self, path) -> None: ... | ||
def rmdir(self, target_directory, allow_symlink: bool = ...) -> None: ... | ||
def listdir(self, target_directory): ... | ||
|
||
class FakeFileOpen: | ||
__name__: str | ||
filesystem: Incomplete | ||
raw_io: Incomplete | ||
def __init__( | ||
self, filesystem, delete_on_close: bool = ..., use_io: bool = ..., raw_io: bool = ... | ||
) -> None: ... | ||
def __call__(self, *args, **kwargs): ... | ||
def call( | ||
self, | ||
file_, | ||
mode: str = ..., | ||
buffering: int = ..., | ||
encoding: Incomplete | None = ..., | ||
errors: Incomplete | None = ..., | ||
newline: Incomplete | None = ..., | ||
closefd: bool = ..., | ||
opener: Incomplete | None = ..., | ||
open_modes: Incomplete | None = ..., | ||
): ... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
"""pytest tests testing subclasses of xcp.accessor.MountingAccessor using pyfakefs""" | ||
import sys | ||
from io import BytesIO | ||
from typing import cast | ||
|
||
from mock import patch | ||
from pyfakefs.fake_filesystem import FakeFileOpen, FakeFilesystem | ||
|
||
import xcp.accessor | ||
|
||
binary_data = b"\x00\x1b\x5b\x95\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xcc\xdd\xee\xff" | ||
|
||
|
||
def test_device_accessor(fs): | ||
# type: (FakeFilesystem) -> None | ||
accessor = xcp.accessor.createAccessor("dev:///dev/device", False) | ||
check_mounting_accessor(accessor, fs) | ||
|
||
|
||
def test_nfs_accessor(fs): | ||
# type: (FakeFilesystem) -> None | ||
accessor = xcp.accessor.createAccessor("nfs://server/path", False) | ||
check_mounting_accessor(accessor, fs) | ||
|
||
|
||
def check_mounting_accessor(accessor, fs): | ||
# type: (xcp.accessor.MountingAccessor, FakeFilesystem) -> None | ||
"""Test subclasses of MountingAccessor (with xcp.cmd.runCmd in xcp.mount mocked)""" | ||
|
||
with patch("xcp.cmd.runCmd") as mount_runcmd: | ||
mount_runcmd.return_value = (0, "", "") | ||
accessor.start() | ||
|
||
assert accessor.location | ||
assert fs.isdir(accessor.location) | ||
|
||
location = accessor.location | ||
|
||
if sys.version_info.major >= 3: | ||
fs.add_mount_point(location) | ||
|
||
assert check_binary_read(accessor, location, fs) | ||
assert check_binary_write(accessor, location, fs) | ||
|
||
if sys.version_info.major >= 3: | ||
fs.mount_points.pop(location) | ||
|
||
with patch("xcp.cmd.runCmd"): | ||
accessor.finish() | ||
|
||
assert not fs.exists(location) | ||
|
||
assert not accessor.location | ||
|
||
|
||
def check_binary_read(accessor, location, fs): | ||
# type: (xcp.accessor.MountingAccessor, str, FakeFilesystem) -> bool | ||
"""Test the openAddress() method of subclasses of xcp.accessor.MountingAccessor""" | ||
|
||
name = "binary_file" | ||
path = location + "/" + name | ||
|
||
assert fs.create_file(path, contents=cast(str, binary_data)) | ||
|
||
assert accessor.access(name) | ||
|
||
binary_file = accessor.openAddress(name) | ||
assert not isinstance(binary_file, bool) | ||
|
||
fs.remove(path) | ||
return cast(bytes, binary_file.read()) == binary_data | ||
|
||
|
||
def check_binary_write(accessor, location, fs): | ||
# type: (xcp.accessor.MountingAccessor, str, FakeFilesystem) -> bool | ||
"""Test the writeFile() method of subclasses of xcp.accessor.MountingAccessor""" | ||
|
||
name = "binary_file_written_by_accessor" | ||
accessor.writeFile(BytesIO(binary_data), name) | ||
|
||
assert accessor.access(name) | ||
|
||
with FakeFileOpen(fs, delete_on_close=True)(location + "/" + name, "rb") as written: | ||
return cast(bytes, written.read()) == binary_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters