Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pathlib ABCs: remove duplicate realpath() implementation. #119178

Merged
merged 7 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 28 additions & 59 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
"""

import functools
import posixpath
from glob import _Globber, _no_recurse_symlinks
from errno import ENOTDIR, ELOOP
from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO


Expand Down Expand Up @@ -670,65 +670,34 @@ def resolve(self, strict=False):
"""
if self._resolving:
return self
path_root, parts = self._stack
path = self.with_segments(path_root)
try:
path = path.absolute()
except UnsupportedOperation:
path_tail = []
else:
path_root, path_tail = path._stack
path_tail.reverse()

# If the user has *not* overridden the `readlink()` method, then symlinks are unsupported
# and (in non-strict mode) we can improve performance by not calling `stat()`.
querying = strict or getattr(self.readlink, '_supported', True)
link_count = 0
while parts:
part = parts.pop()
if not part or part == '.':
continue
if part == '..':
if not path_tail:
if path_root:
# Delete '..' segment immediately following root
continue
elif path_tail[-1] != '..':
# Delete '..' segment and its predecessor
path_tail.pop()
continue
path_tail.append(part)
if querying and part != '..':
path = self.with_segments(path_root + self.parser.sep.join(path_tail))

def getcwd():
return str(self.with_segments().absolute())

if strict or getattr(self.readlink, '_supported', True):
def lstat(path_str):
path = self.with_segments(path_str)
path._resolving = True
return path.lstat()

def readlink(path_str):
path = self.with_segments(path_str)
path._resolving = True
try:
st = path.stat(follow_symlinks=False)
if S_ISLNK(st.st_mode):
# Like Linux and macOS, raise OSError(errno.ELOOP) if too many symlinks are
# encountered during resolution.
link_count += 1
if link_count >= self._max_symlinks:
raise OSError(ELOOP, "Too many symbolic links in path", self._raw_path)
target_root, target_parts = path.readlink()._stack
# If the symlink target is absolute (like '/etc/hosts'), set the current
# path to its uppermost parent (like '/').
if target_root:
path_root = target_root
path_tail.clear()
else:
path_tail.pop()
# Add the symlink target's reversed tail parts (like ['hosts', 'etc']) to
# the stack of unresolved path parts.
parts.extend(target_parts)
continue
elif parts and not S_ISDIR(st.st_mode):
raise NotADirectoryError(ENOTDIR, "Not a directory", self._raw_path)
except OSError:
if strict:
raise
else:
querying = False
return self.with_segments(path_root + self.parser.sep.join(path_tail))
return str(path.readlink())
else:
# If the user has *not* overridden the `readlink()` method, then
# symlinks are unsupported and (in non-strict mode) we can improve
# performance by not calling `path.lstat()`.
def skip(path_str):
# This exception will be internally consumed by `_realpath()`.
raise OSError("Operation skipped.")

lstat = readlink = skip

return self.with_segments(posixpath._realpath(
str(self), strict, self.parser.sep,
getcwd=getcwd, lstat=lstat, readlink=readlink,
maxlinks=self._max_symlinks))

def symlink_to(self, target, target_is_directory=False):
"""
Expand Down
40 changes: 29 additions & 11 deletions Lib/posixpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
altsep = None
devnull = '/dev/null'

import errno
import os
import sys
import stat
Expand Down Expand Up @@ -432,7 +433,10 @@ def realpath(filename, *, strict=False):
curdir = '.'
pardir = '..'
getcwd = os.getcwd
return _realpath(filename, strict, sep, curdir, pardir, getcwd)

def _realpath(filename, strict=False, sep=sep, curdir=curdir, pardir=pardir,
getcwd=os.getcwd, lstat=os.lstat, readlink=os.readlink, maxlinks=None):
barneygale marked this conversation as resolved.
Show resolved Hide resolved
# The stack of unresolved path parts. When popped, a special value of None
# indicates that a symlink target has been resolved, and that the original
# symlink path can be retrieved by popping again. The [::-1] slice is a
Expand All @@ -449,6 +453,10 @@ def realpath(filename, *, strict=False):
# the same links.
seen = {}

# Number of symlinks traversed. When the number of traversals is limited
# by *maxlinks*, this is used instead of *seen* to detect symlink loops.
link_count = 0

while rest:
name = rest.pop()
if name is None:
Expand All @@ -467,38 +475,48 @@ def realpath(filename, *, strict=False):
else:
newpath = path + sep + name
try:
st = os.lstat(newpath)
st = lstat(newpath)
if not stat.S_ISLNK(st.st_mode):
path = newpath
continue
if newpath in seen:
elif maxlinks is not None:
link_count += 1
if link_count > maxlinks:
if strict:
raise OSError(errno.ELOOP, os.strerror(errno.ELOOP),
newpath)
path = newpath
continue
elif newpath in seen:
# Already seen this path
path = seen[newpath]
if path is not None:
# use cached value
continue
# The symlink is not resolved, so we must have a symlink loop.
if strict:
# Raise OSError(errno.ELOOP)
os.stat(newpath)
raise OSError(errno.ELOOP, os.strerror(errno.ELOOP),
newpath)
path = newpath
continue
target = os.readlink(newpath)
target = readlink(newpath)
except OSError:
if strict:
raise
path = newpath
continue
# Resolve the symbolic link
seen[newpath] = None # not resolved symlink
if target.startswith(sep):
# Symlink target is absolute; reset resolved path.
path = sep
# Push the symlink path onto the stack, and signal its specialness by
# also pushing None. When these entries are popped, we'll record the
# fully-resolved symlink target in the 'seen' mapping.
rest.append(newpath)
rest.append(None)
if maxlinks is None:
# Mark this symlink as seen but not fully resolved.
seen[newpath] = None
# Push the symlink path onto the stack, and signal its specialness
# by also pushing None. When these entries are popped, we'll
# record the fully-resolved symlink target in the 'seen' mapping.
rest.append(newpath)
rest.append(None)
# Push the unresolved symlink target parts onto the stack.
rest.extend(target.split(sep)[::-1])

Expand Down
Loading