From 071b548844dd2b300563bac92f49749592134023 Mon Sep 17 00:00:00 2001 From: mrbean-bremen Date: Sat, 16 Mar 2024 14:33:32 +0100 Subject: [PATCH] Add support for fake os.dup, os.dup2 and os.lseek - closes #970 --- CHANGES.md | 3 +- docs/troubleshooting.rst | 3 ++ pyfakefs/fake_file.py | 6 ++-- pyfakefs/fake_filesystem.py | 25 ++++++++++++-- pyfakefs/fake_os.py | 20 ++++++++++++ pyfakefs/tests/fake_os_test.py | 59 ++++++++++++++++++++++++++++++++++ 6 files changed, 110 insertions(+), 6 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index cbfb705a..cf33df47 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,7 +4,7 @@ The released versions correspond to PyPI releases. ## Unreleased ### Changes -* the handling of file permissions under Posix is should now mostly match the behavior +* the handling of file permissions under Posix should now mostly match the behavior of the real filesystem, which may change the behavior of some tests * removed the argument `module_cleanup_mode`, that was introduced as a temporary workaround in the previous version - related problems shall be handled using a cleanup handler @@ -12,6 +12,7 @@ The released versions correspond to PyPI releases. ### Enhancements * added support for `O_NOFOLLOW` and `O_DIRECTORY` flags in `os.open` (see [#972](../../issues/972) and [#974](../../issues/974)) +* added support for fake `os.dup`, `os.dup2` and `os.lseek` (see [#970](../../issues/970)) ### Fixes * fixed a specific problem on reloading a pandas-related module (see [#947](../../issues/947)), diff --git a/docs/troubleshooting.rst b/docs/troubleshooting.rst index 6cf0b31c..2a87cb91 100644 --- a/docs/troubleshooting.rst +++ b/docs/troubleshooting.rst @@ -223,6 +223,9 @@ is the convenience argument :ref:`allow_root_user`: def setUp(self): self.setUpPyfakefs(allow_root_user=False) +``Pyfakefs`` also handles file permissions under UNIX systems while accessing files. +If accessing files as another user and/or group, the respective group/other file +permissions are considered. .. _usage_with_mock_open: diff --git a/pyfakefs/fake_file.py b/pyfakefs/fake_file.py index 5d7d3ee1..b91127f6 100644 --- a/pyfakefs/fake_file.py +++ b/pyfakefs/fake_file.py @@ -858,7 +858,7 @@ def close(self) -> None: # if we get here, we have an open file descriptor # without write permission, which has to be closed assert self.filedes - self._filesystem._close_open_file(self.filedes) + self._filesystem.close_open_file(self.filedes) raise if self._filesystem.is_windows_fs and self._changed: @@ -866,7 +866,7 @@ def close(self) -> None: assert self.filedes is not None if self._closefd: - self._filesystem._close_open_file(self.filedes) + self._filesystem.close_open_file(self.filedes) else: open_files = self._filesystem.open_files[self.filedes] assert open_files is not None @@ -1288,7 +1288,7 @@ def fileno(self) -> int: def close(self) -> None: """Close the directory.""" assert self.filedes is not None - self._filesystem._close_open_file(self.filedes) + self._filesystem.close_open_file(self.filedes) class FakePipeWrapper: diff --git a/pyfakefs/fake_filesystem.py b/pyfakefs/fake_filesystem.py index 4ff0879c..108c751b 100644 --- a/pyfakefs/fake_filesystem.py +++ b/pyfakefs/fake_filesystem.py @@ -814,7 +814,7 @@ def _handle_utime_arg_errors( if ns is not None and len(ns) != 2: raise TypeError("utime: 'ns' must be a tuple of two ints") - def add_open_file(self, file_obj: AnyFileWrapper) -> int: + def add_open_file(self, file_obj: AnyFileWrapper, new_fd: int = -1) -> int: """Add file_obj to the list of open files on the filesystem. Used internally to manage open files. @@ -822,10 +822,31 @@ def add_open_file(self, file_obj: AnyFileWrapper) -> int: Args: file_obj: File object to be added to open files list. + new_fd: The optional new file descriptor. Returns: File descriptor number for the file object. """ + if new_fd >= 0: + size = len(self.open_files) + if new_fd < size: + open_files = self.open_files[new_fd] + if open_files: + for f in open_files: + try: + f.close() + except OSError: + pass + if new_fd in self._free_fd_heap: + self._free_fd_heap.remove(new_fd) + self.open_files[new_fd] = [file_obj] + else: + for fd in range(size, new_fd): + self.open_files.append([]) + heapq.heappush(self._free_fd_heap, fd) + self.open_files.append([file_obj]) + return new_fd + if self._free_fd_heap: open_fd = heapq.heappop(self._free_fd_heap) self.open_files[open_fd] = [file_obj] @@ -834,7 +855,7 @@ def add_open_file(self, file_obj: AnyFileWrapper) -> int: self.open_files.append([file_obj]) return len(self.open_files) - 1 - def _close_open_file(self, file_des: int) -> None: + def close_open_file(self, file_des: int) -> None: """Remove file object with given descriptor from the list of open files. diff --git a/pyfakefs/fake_os.py b/pyfakefs/fake_os.py index 02eeb69f..565fb8a0 100644 --- a/pyfakefs/fake_os.py +++ b/pyfakefs/fake_os.py @@ -102,12 +102,15 @@ def dir() -> List[str]: "chmod", "chown", "close", + "dup", + "dup2", "fstat", "fsync", "getcwd", "lchmod", "link", "listdir", + "lseek", "lstat", "makedirs", "mkdir", @@ -321,6 +324,16 @@ def close(self, fd: int) -> None: file_handle = self.filesystem.get_open_file(fd) file_handle.close() + def dup(self, fd: int) -> int: + file_handle = self.filesystem.get_open_file(fd) + return self.filesystem.add_open_file(file_handle) + + def dup2(self, fd: int, fd2: int, inheritable: bool = True) -> int: + if fd == fd2: + return fd + file_handle = self.filesystem.get_open_file(fd) + return self.filesystem.add_open_file(file_handle, fd2) + def read(self, fd: int, n: int) -> bytes: """Read number of bytes from a file descriptor, returns bytes read. @@ -370,6 +383,13 @@ def write(self, fd: int, contents: bytes) -> int: file_handle.flush() return len(contents) + def lseek(self, fd: int, pos: int, whence: int): + file_handle = self.filesystem.get_open_file(fd) + if isinstance(file_handle, FakeFileWrapper): + file_handle.seek(pos, whence) + else: + raise OSError(errno.EBADF, "Bad file descriptor for fseek") + def pipe(self) -> Tuple[int, int]: read_fd, write_fd = os.pipe() read_wrapper = FakePipeWrapper(self.filesystem, read_fd, False) diff --git a/pyfakefs/tests/fake_os_test.py b/pyfakefs/tests/fake_os_test.py index b89ab70f..c9f69c19 100644 --- a/pyfakefs/tests/fake_os_test.py +++ b/pyfakefs/tests/fake_os_test.py @@ -3002,6 +3002,65 @@ def test_capabilities(self): os.stat in os.supports_effective_ids, ) + def test_dup(self): + with self.assertRaises(OSError) as cm: + self.os.dup(500) + self.assertEqual(errno.EBADF, cm.exception.errno) + file_path = self.make_path("test.txt") + self.create_file(file_path, contents="heythere") + fd1 = self.os.open(file_path, os.O_RDONLY) + fd2 = self.os.dup(fd1) + self.assertEqual(b"hey", self.os.read(fd1, 3)) + self.assertEqual(b"there", self.os.read(fd1, 10)) + self.os.close(fd1) + self.os.close(fd2) + + def test_dup_uses_freed_fd(self): + file_path1 = self.make_path("foo.txt") + file_path2 = self.make_path("bar.txt") + self.create_file(file_path1, contents="foo here") + self.create_file(file_path2, contents="bar here") + fd1 = self.os.open(file_path1, os.O_RDONLY) + fd2 = self.os.open(file_path2, os.O_RDONLY) + self.os.close(fd1) + fd3 = self.os.dup(fd2) + self.assertEqual(fd1, fd3) + self.os.close(fd2) + + def test_dup2_uses_existing_fd(self): + with self.assertRaises(OSError) as cm: + self.os.dup2(500, 501) + self.assertEqual(errno.EBADF, cm.exception.errno) + + file_path1 = self.make_path("foo.txt") + file_path2 = self.make_path("bar.txt") + self.create_file(file_path1, contents="foo here") + self.create_file(file_path2, contents="bar here") + fd1 = self.os.open(file_path1, os.O_RDONLY) + fd2 = self.os.open(file_path2, os.O_RDONLY) + self.assertEqual(b"bar", self.os.read(fd2, 3)) + fd2 = self.os.dup2(fd1, fd2) + self.assertEqual(b"foo", self.os.read(fd2, 3)) + self.os.lseek(fd2, 0, 0) + self.assertEqual(b"foo", self.os.read(fd1, 3)) + self.os.close(fd2) + + def test_dup2_with_new_fd(self): + file_path1 = self.make_path("foo.txt") + file_path2 = self.make_path("bar.txt") + self.create_file(file_path1) + self.create_file(file_path2) + fd1 = self.os.open(file_path1, os.O_RDONLY) + fd2 = fd1 + 2 + self.assertEqual(fd2, self.os.dup2(fd1, fd2)) + fd3 = self.os.open(file_path2, os.O_RDONLY) + fd4 = self.os.dup(fd3) + self.os.close(fd4) + self.os.close(fd2) + # we have a free position before fd2 that is now filled + self.assertEqual(fd1 + 1, fd3) + self.assertEqual(fd1 + 3, fd4) + class RealOsModuleTest(FakeOsModuleTest): def use_real_fs(self):