Skip to content

Commit

Permalink
[CVE-2023-6597] tempfile.TemporaryDirectory: fix symlink bug in cleanup
Browse files Browse the repository at this point in the history
Code is from gh#python/cpython!99930, it was released upstream in
3.8.19.

Fixes: bsc#1219666
Patch: CVE-2023-6597-TempDir-cleaning-symlink.patch
  • Loading branch information
kwi-dk authored and mcepl committed Apr 25, 2024
1 parent f47015f commit 3e7a86b
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 13 deletions.
114 changes: 104 additions & 10 deletions Lib/tempfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,27 @@ def _sanitize_params(prefix, suffix, dir):
return prefix, suffix, dir, output_type


def _infer_return_type(*args):
"""Look at the type of all args and divine their implied return type."""
return_type = None
for arg in args:
if arg is None:
continue
if isinstance(arg, bytes):
if return_type is str:
raise TypeError("Can't mix bytes and non-bytes in "
"path components.")
return_type = bytes
else:
if return_type is bytes:
raise TypeError("Can't mix bytes and non-bytes in "
"path components.")
return_type = str
if return_type is None:
return str # tempfile APIs return a str by default.
return return_type


class _RandomNameSequence:
"""An instance of _RandomNameSequence generates an endless
sequence of unpredictable strings which can safely be incorporated
Expand Down Expand Up @@ -275,6 +296,22 @@ def _mkstemp_inner(dir, pre, suf, flags, output_type):
raise FileExistsError(_errno.EEXIST,
"No usable temporary file name found")

def _dont_follow_symlinks(func, path, *args):
# Pass follow_symlinks=False, unless not supported on this platform.
if func in _os.supports_follow_symlinks:
func(path, *args, follow_symlinks=False)
elif _os.name == 'nt' or not _os.path.islink(path):
func(path, *args)

def _resetperms(path):
try:
chflags = _os.chflags
except AttributeError:
pass
else:
_dont_follow_symlinks(chflags, path, 0)
_dont_follow_symlinks(_os.chmod, path, 0o700)


# User visible interfaces.

Expand Down Expand Up @@ -776,7 +813,7 @@ def writelines(self, iterable):
return rv


class TemporaryDirectory(object):
class TemporaryDirectory:
"""Create and return a temporary directory. This has the same
behavior as mkdtemp but can be used as a context manager. For
example:
Expand All @@ -785,19 +822,75 @@ class TemporaryDirectory(object):
...
Upon exiting the context, the directory and everything contained
in it are removed.
in it are removed (unless delete=False is passed or an exception
is raised during cleanup and ignore_cleanup_errors is not True).
Optional Arguments:
suffix - A str suffix for the directory name. (see mkdtemp)
prefix - A str prefix for the directory name. (see mkdtemp)
dir - A directory to create this temp dir in. (see mkdtemp)
ignore_cleanup_errors - False; ignore exceptions during cleanup?
delete - True; whether the directory is automatically deleted.
"""

def __init__(self, suffix=None, prefix=None, dir=None):
def __init__(self, suffix=None, prefix=None, dir=None,
ignore_cleanup_errors=False, *, delete=True):
self.name = mkdtemp(suffix, prefix, dir)
self._ignore_cleanup_errors = ignore_cleanup_errors
self._delete = delete
self._finalizer = _weakref.finalize(
self, self._cleanup, self.name,
warn_message="Implicitly cleaning up {!r}".format(self))
warn_message="Implicitly cleaning up {!r}".format(self),
ignore_errors=self._ignore_cleanup_errors, delete=self._delete)

@classmethod
def _rmtree(cls, name, ignore_errors=False, repeated=False):
def onexc(func, path, exc_info):
exc = exc_info[1]
if isinstance(exc, PermissionError):
if repeated and path == name:
if ignore_errors:
return
raise

try:
if path != name:
_resetperms(_os.path.dirname(path))
_resetperms(path)

try:
_os.unlink(path)
except IsADirectoryError:
cls._rmtree(path)
except PermissionError:
# The PermissionError handler was originally added for
# FreeBSD in directories, but it seems that it is raised
# on Windows too.
# bpo-43153: Calling _rmtree again may
# raise NotADirectoryError and mask the PermissionError.
# So we must re-raise the current PermissionError if
# path is not a directory.
if not _os.path.isdir(path) or _os.path.isjunction(path):
if ignore_errors:
return
raise
cls._rmtree(path, ignore_errors=ignore_errors,
repeated=(path == name))
except FileNotFoundError:
pass
elif isinstance(exc, FileNotFoundError):
pass
else:
if not ignore_errors:
raise

_shutil.rmtree(name, onerror=onexc)

@classmethod
def _cleanup(cls, name, warn_message):
_shutil.rmtree(name)
_warnings.warn(warn_message, ResourceWarning)
def _cleanup(cls, name, warn_message, ignore_errors=False, delete=True):
if delete:
cls._rmtree(name, ignore_errors=ignore_errors)
_warnings.warn(warn_message, ResourceWarning)

def __repr__(self):
return "<{} {!r}>".format(self.__class__.__name__, self.name)
Expand All @@ -806,8 +899,9 @@ def __enter__(self):
return self.name

def __exit__(self, exc, value, tb):
self.cleanup()
if self._delete:
self.cleanup()

def cleanup(self):
if self._finalizer.detach():
_shutil.rmtree(self.name)
if self._finalizer.detach() or _os.path.exists(self.name):
self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
10 changes: 7 additions & 3 deletions Lib/test/support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2382,15 +2382,18 @@ def match_value(self, k, dv, v):
result = dv.find(v) >= 0
return result


_can_symlink = None
def can_symlink():
global _can_symlink
if _can_symlink is not None:
return _can_symlink
symlink_path = TESTFN + "can_symlink"
# WASI / wasmtime prevents symlinks with absolute paths, see man
# openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
# paths. Skip symlink tests on WASI for now.
src = os.path.abspath(TESTFN)
symlink_path = src + "can_symlink"
try:
os.symlink(TESTFN, symlink_path)
os.symlink(src, symlink_path)
can = True
except (OSError, NotImplementedError, AttributeError):
can = False
Expand All @@ -2399,6 +2402,7 @@ def can_symlink():
_can_symlink = can
return can


def skip_unless_symlink(test):
"""Skip decorator for tests that require functional symlink"""
ok = can_symlink()
Expand Down
98 changes: 98 additions & 0 deletions Lib/test/test_tempfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,7 @@ def __exit__(self, *exc_info):
d.clear()
d.update(c)


class TestTemporaryDirectory(BaseTestCase):
"""Test TemporaryDirectory()."""

Expand Down Expand Up @@ -1355,6 +1356,103 @@ def test_cleanup_with_symlink_to_a_directory(self):
"were deleted")
d2.cleanup()

@support.skip_unless_symlink
def test_cleanup_with_symlink_modes(self):
# cleanup() should not follow symlinks when fixing mode bits (#91133)
with self.do_create(recurse=0) as d2:
file1 = os.path.join(d2, 'file1')
open(file1, 'wb').close()
dir1 = os.path.join(d2, 'dir1')
os.mkdir(dir1)
for mode in range(8):
mode <<= 6
with self.subTest(mode=format(mode, '03o')):
def test(target, target_is_directory):
d1 = self.do_create(recurse=0)
symlink = os.path.join(d1.name, 'symlink')
os.symlink(target, symlink,
target_is_directory=target_is_directory)
try:
os.chmod(symlink, mode, follow_symlinks=False)
except NotImplementedError:
pass
try:
os.chmod(symlink, mode)
except FileNotFoundError:
pass
os.chmod(d1.name, mode)
d1.cleanup()
self.assertFalse(os.path.exists(d1.name))

with self.subTest('nonexisting file'):
test('nonexisting', target_is_directory=False)
with self.subTest('nonexisting dir'):
test('nonexisting', target_is_directory=True)

with self.subTest('existing file'):
os.chmod(file1, mode)
old_mode = os.stat(file1).st_mode
test(file1, target_is_directory=False)
new_mode = os.stat(file1).st_mode
self.assertEqual(new_mode, old_mode,
'%03o != %03o' % (new_mode, old_mode))

with self.subTest('existing dir'):
os.chmod(dir1, mode)
old_mode = os.stat(dir1).st_mode
test(dir1, target_is_directory=True)
new_mode = os.stat(dir1).st_mode
self.assertEqual(new_mode, old_mode,
'%03o != %03o' % (new_mode, old_mode))

@unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
@support.skip_unless_symlink
def test_cleanup_with_symlink_flags(self):
# cleanup() should not follow symlinks when fixing flags (#91133)
flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
self.check_flags(flags)

with self.do_create(recurse=0) as d2:
file1 = os.path.join(d2, 'file1')
open(file1, 'wb').close()
dir1 = os.path.join(d2, 'dir1')
os.mkdir(dir1)
def test(target, target_is_directory):
d1 = self.do_create(recurse=0)
symlink = os.path.join(d1.name, 'symlink')
os.symlink(target, symlink,
target_is_directory=target_is_directory)
try:
os.chflags(symlink, flags, follow_symlinks=False)
except NotImplementedError:
pass
try:
os.chflags(symlink, flags)
except FileNotFoundError:
pass
os.chflags(d1.name, flags)
d1.cleanup()
self.assertFalse(os.path.exists(d1.name))

with self.subTest('nonexisting file'):
test('nonexisting', target_is_directory=False)
with self.subTest('nonexisting dir'):
test('nonexisting', target_is_directory=True)

with self.subTest('existing file'):
os.chflags(file1, flags)
old_flags = os.stat(file1).st_flags
test(file1, target_is_directory=False)
new_flags = os.stat(file1).st_flags
self.assertEqual(new_flags, old_flags)

with self.subTest('existing dir'):
os.chflags(dir1, flags)
old_flags = os.stat(dir1).st_flags
test(dir1, target_is_directory=True)
new_flags = os.stat(dir1).st_flags
self.assertEqual(new_flags, old_flags)

@support.cpython_only
def test_del_on_collection(self):
# A TemporaryDirectory is deleted when garbage collected
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix a bug in :class:`tempfile.TemporaryDirectory` cleanup, which now no longer
dereferences symlinks when working around file system permission errors.

0 comments on commit 3e7a86b

Please sign in to comment.