Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-43012: remove pathlib._Accessor #25701

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 59 additions & 112 deletions Lib/pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,93 +275,6 @@ def make_uri(self, path):
_posix_flavour = _PosixFlavour()


class _Accessor:
"""An accessor implements a particular (system-specific or not) way of
accessing paths on the filesystem."""


class _NormalAccessor(_Accessor):

stat = os.stat

open = io.open

listdir = os.listdir

scandir = os.scandir

chmod = os.chmod

mkdir = os.mkdir

unlink = os.unlink

if hasattr(os, "link"):
link = os.link
else:
def link(self, src, dst):
raise NotImplementedError("os.link() not available on this system")

rmdir = os.rmdir

rename = os.rename

replace = os.replace

if hasattr(os, "symlink"):
symlink = os.symlink
else:
def symlink(self, src, dst, target_is_directory=False):
raise NotImplementedError("os.symlink() not available on this system")

def touch(self, path, mode=0o666, exist_ok=True):
if exist_ok:
# First try to bump modification time
# Implementation note: GNU touch uses the UTIME_NOW option of
# the utimensat() / futimens() functions.
try:
os.utime(path, None)
except OSError:
# Avoid exception chaining
pass
else:
return
flags = os.O_CREAT | os.O_WRONLY
if not exist_ok:
flags |= os.O_EXCL
fd = os.open(path, flags, mode)
os.close(fd)

if hasattr(os, "readlink"):
readlink = os.readlink
else:
def readlink(self, path):
raise NotImplementedError("os.readlink() not available on this system")

def owner(self, path):
try:
import pwd
return pwd.getpwuid(self.stat(path).st_uid).pw_name
except ImportError:
raise NotImplementedError("Path.owner() is unsupported on this system")

def group(self, path):
try:
import grp
return grp.getgrgid(self.stat(path).st_gid).gr_name
except ImportError:
raise NotImplementedError("Path.group() is unsupported on this system")

getcwd = os.getcwd

expanduser = staticmethod(os.path.expanduser)

realpath = staticmethod(os.path.realpath)


_normal_accessor = _NormalAccessor()


#
# Globbing helpers
#
Expand Down Expand Up @@ -402,7 +315,7 @@ def select_from(self, parent_path):
path_cls = type(parent_path)
is_dir = path_cls.is_dir
exists = path_cls.exists
scandir = parent_path._accessor.scandir
scandir = path_cls._scandir
if not is_dir(parent_path):
return iter([])
return self._select_from(parent_path, is_dir, exists, scandir)
Expand Down Expand Up @@ -949,7 +862,6 @@ class Path(PurePath):
object. You can also instantiate a PosixPath or WindowsPath directly,
but cannot instantiate a WindowsPath on a POSIX system or vice versa.
"""
_accessor = _normal_accessor
__slots__ = ()

def __new__(cls, *args, **kwargs):
Expand Down Expand Up @@ -988,7 +900,7 @@ def cwd(cls):
"""Return a new path pointing to the current working directory
(as returned by os.getcwd()).
"""
return cls(cls._accessor.getcwd())
return cls(os.getcwd())

@classmethod
def home(cls):
Expand All @@ -1005,16 +917,22 @@ def samefile(self, other_path):
try:
other_st = other_path.stat()
except AttributeError:
other_st = self._accessor.stat(other_path)
other_st = self.__class__(other_path).stat()
return os.path.samestat(st, other_st)

def iterdir(self):
"""Iterate over the files in this directory. Does not yield any
result for the special paths '.' and '..'.
"""
for name in self._accessor.listdir(self):
for name in os.listdir(self):
yield self._make_child_relpath(name)

def _scandir(self):
# bpo-24132: a future version of pathlib will support subclassing of
# pathlib.Path to customize how the filesystem is accessed. This
# includes scandir(), which is used to implement glob().
return os.scandir(self)

def glob(self, pattern):
"""Iterate over this subtree and yield all existing files (of any
kind, including directories) matching the given relative pattern.
Expand Down Expand Up @@ -1050,7 +968,7 @@ def absolute(self):
"""
if self.is_absolute():
return self
return self._from_parts([self._accessor.getcwd()] + self._parts)
return self._from_parts([self.cwd()] + self._parts)

def resolve(self, strict=False):
"""
Expand All @@ -1064,7 +982,7 @@ def check_eloop(e):
raise RuntimeError("Symlink loop from %r" % e.filename)

try:
s = self._accessor.realpath(self, strict=strict)
s = os.path.realpath(self, strict=strict)
except OSError as e:
check_eloop(e)
raise
Expand All @@ -1084,19 +1002,28 @@ def stat(self, *, follow_symlinks=True):
Return the result of the stat() system call on this path, like
os.stat() does.
"""
return self._accessor.stat(self, follow_symlinks=follow_symlinks)
return os.stat(self, follow_symlinks=follow_symlinks)

def owner(self):
"""
Return the login name of the file owner.
"""
return self._accessor.owner(self)
try:
import pwd
return pwd.getpwuid(self.stat().st_uid).pw_name
except ImportError:
raise NotImplementedError("Path.owner() is unsupported on this system")

def group(self):
"""
Return the group name of the file gid.
"""
return self._accessor.group(self)

try:
import grp
return grp.getgrgid(self.stat().st_gid).gr_name
except ImportError:
raise NotImplementedError("Path.group() is unsupported on this system")

def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
Expand All @@ -1106,8 +1033,7 @@ def open(self, mode='r', buffering=-1, encoding=None,
"""
if "b" not in mode:
encoding = io.text_encoding(encoding)
return self._accessor.open(self, mode, buffering, encoding, errors,
newline)
return io.open(self, mode, buffering, encoding, errors, newline)

def read_bytes(self):
"""
Expand Down Expand Up @@ -1148,21 +1074,38 @@ def readlink(self):
"""
Return the path to which the symbolic link points.
"""
path = self._accessor.readlink(self)
return self._from_parts((path,))
if not hasattr(os, "readlink"):
raise NotImplementedError("os.readlink() not available on this system")
return self._from_parts((os.readlink(self),))

def touch(self, mode=0o666, exist_ok=True):
"""
Create this file with the given access mode, if it doesn't exist.
"""
self._accessor.touch(self, mode, exist_ok)

if exist_ok:
# First try to bump modification time
# Implementation note: GNU touch uses the UTIME_NOW option of
# the utimensat() / futimens() functions.
try:
os.utime(self, None)
except OSError:
# Avoid exception chaining
pass
else:
return
flags = os.O_CREAT | os.O_WRONLY
if not exist_ok:
flags |= os.O_EXCL
fd = os.open(self, flags, mode)
os.close(fd)

def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""
Create a new directory at this given path.
"""
try:
self._accessor.mkdir(self, mode)
os.mkdir(self, mode)
except FileNotFoundError:
if not parents or self.parent == self:
raise
Expand All @@ -1178,7 +1121,7 @@ def chmod(self, mode, *, follow_symlinks=True):
"""
Change the permissions of the path, like os.chmod().
"""
self._accessor.chmod(self, mode, follow_symlinks=follow_symlinks)
os.chmod(self, mode, follow_symlinks=follow_symlinks)

def lchmod(self, mode):
"""
Expand All @@ -1193,7 +1136,7 @@ def unlink(self, missing_ok=False):
If the path is a directory, use rmdir() instead.
"""
try:
self._accessor.unlink(self)
os.unlink(self)
except FileNotFoundError:
if not missing_ok:
raise
Expand All @@ -1202,7 +1145,7 @@ def rmdir(self):
"""
Remove this directory. The directory must be empty.
"""
self._accessor.rmdir(self)
os.rmdir(self)

def lstat(self):
"""
Expand All @@ -1221,7 +1164,7 @@ def rename(self, target):

Returns the new Path instance pointing to the target path.
"""
self._accessor.rename(self, target)
os.rename(self, target)
return self.__class__(target)

def replace(self, target):
Expand All @@ -1234,23 +1177,27 @@ def replace(self, target):

Returns the new Path instance pointing to the target path.
"""
self._accessor.replace(self, target)
os.replace(self, target)
return self.__class__(target)

def symlink_to(self, target, target_is_directory=False):
"""
Make this path a symlink pointing to the target path.
Note the order of arguments (link, target) is the reverse of os.symlink.
"""
self._accessor.symlink(target, self, target_is_directory)
if not hasattr(os, "symlink"):
raise NotImplementedError("os.symlink() not available on this system")
os.symlink(target, self, target_is_directory)

def hardlink_to(self, target):
"""
Make this path a hard link pointing to the same file as *target*.

Note the order of arguments (self, target) is the reverse of os.link's.
"""
self._accessor.link(target, self)
if not hasattr(os, "link"):
raise NotImplementedError("os.link() not available on this system")
os.link(target, self)

def link_to(self, target):
"""
Expand All @@ -1268,7 +1215,7 @@ def link_to(self, target):
"for removal in Python 3.12. "
"Use pathlib.Path.hardlink_to() instead.",
DeprecationWarning, stacklevel=2)
self._accessor.link(self, target)
self.__class__(target).hardlink_to(self)

# Convenience functions for querying the stat results

Expand Down Expand Up @@ -1425,7 +1372,7 @@ def expanduser(self):
"""
if (not (self._drv or self._root) and
self._parts and self._parts[0][:1] == '~'):
homedir = self._accessor.expanduser(self._parts[0])
homedir = os.path.expanduser(self._parts[0])
if homedir[:1] == "~":
raise RuntimeError("Could not determine home directory.")
return self._from_parts([homedir] + self._parts[1:])
Expand Down
Loading