diff --git a/Doc/library/pathlib.rst b/Doc/library/pathlib.rst index f139abd2454d69..a74b1321cb4b1d 100644 --- a/Doc/library/pathlib.rst +++ b/Doc/library/pathlib.rst @@ -1645,6 +1645,34 @@ Copying, renaming and deleting Remove this directory. The directory must be empty. +.. method:: Path.rmtree(ignore_errors=False, on_error=None) + + Recursively delete this entire directory tree. The path must not refer to a symlink. + + If *ignore_errors* is true, errors resulting from failed removals will be + ignored. If *ignore_errors* is false or omitted, and a function is given to + *on_error*, it will be called each time an exception is raised. If neither + *ignore_errors* nor *on_error* are supplied, exceptions are propagated to + the caller. + + .. note:: + + On platforms that support the necessary fd-based functions, a symlink + attack-resistant version of :meth:`~Path.rmtree` is used by default. On + other platforms, the :func:`~Path.rmtree` implementation is susceptible + to a symlink attack: given proper timing and circumstances, attackers + can manipulate symlinks on the filesystem to delete files they would not + be able to access otherwise. + + If the optional argument *on_error* is specified, it should be a callable; + it will be called with one argument of type :exc:`OSError`. The + callable can handle the error to continue the deletion process or re-raise + it to stop. Note that the filename is available as the :attr:`~OSError.filename` + attribute of the exception object. + + .. versionadded:: 3.14 + + Permissions and ownership ^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 21eb7c3e6efa4a..6f57733470565e 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -118,11 +118,15 @@ os pathlib ------- -* Add :meth:`pathlib.Path.copy`, which copies the content of one file to - another, like :func:`shutil.copyfile`. - (Contributed by Barney Gale in :gh:`73991`.) -* Add :meth:`pathlib.Path.copytree`, which copies one directory tree to - another. +* Add methods to :class:`pathlib.Path` to recursively copy or remove files: + + * :meth:`~pathlib.Path.copy` copies the content of one file to another, like + :func:`shutil.copyfile`. + * :meth:`~pathlib.Path.copytree` copies one directory tree to another, like + :func:`shutil.copytree`. + * :meth:`~pathlib.Path.rmtree` recursively removes a directory tree, like + :func:`shutil.rmtree`. + (Contributed by Barney Gale in :gh:`73991`.) pdb diff --git a/Lib/pathlib/_abc.py b/Lib/pathlib/_abc.py index 05f55badd77c58..49e8e4ca13782c 100644 --- a/Lib/pathlib/_abc.py +++ b/Lib/pathlib/_abc.py @@ -915,6 +915,47 @@ def rmdir(self): """ raise UnsupportedOperation(self._unsupported_msg('rmdir()')) + def rmtree(self, ignore_errors=False, on_error=None): + """ + Recursively delete this directory tree. + + If *ignore_errors* is true, exceptions raised from scanning the tree + and removing files and directories are ignored. Otherwise, if + *on_error* is set, it will be called to handle the error. If neither + *ignore_errors* nor *on_error* are set, exceptions are propagated to + the caller. + """ + if ignore_errors: + def on_error(err): + pass + elif on_error is None: + def on_error(err): + raise err + try: + if self.is_symlink(): + raise OSError("Cannot call rmtree on a symbolic link") + elif self.is_junction(): + raise OSError("Cannot call rmtree on a junction") + results = self.walk( + on_error=on_error, + top_down=False, # Bottom-up so we rmdir() empty directories. + follow_symlinks=False) + for dirpath, dirnames, filenames in results: + for name in filenames: + try: + dirpath.joinpath(name).unlink() + except OSError as err: + on_error(err) + for name in dirnames: + try: + dirpath.joinpath(name).rmdir() + except OSError as err: + on_error(err) + self.rmdir() + except OSError as err: + err.filename = str(self) + on_error(err) + def owner(self, *, follow_symlinks=True): """ Return the login name of the file owner. diff --git a/Lib/pathlib/_local.py b/Lib/pathlib/_local.py index eae8a30c876f19..4fd5279f9fe9ce 100644 --- a/Lib/pathlib/_local.py +++ b/Lib/pathlib/_local.py @@ -830,6 +830,25 @@ def rmdir(self): """ os.rmdir(self) + def rmtree(self, ignore_errors=False, on_error=None): + """ + Recursively delete this directory tree. + + If *ignore_errors* is true, exceptions raised from scanning the tree + and removing files and directories are ignored. Otherwise, if + *on_error* is set, it will be called to handle the error. If neither + *ignore_errors* nor *on_error* are set, exceptions are propagated to + the caller. + """ + if on_error: + def onexc(func, filename, err): + err.filename = filename + on_error(err) + else: + onexc = None + import shutil + shutil.rmtree(str(self), ignore_errors, onexc=onexc) + def rename(self, target): """ Rename this path to the target path. diff --git a/Lib/test/test_pathlib/test_pathlib.py b/Lib/test/test_pathlib/test_pathlib.py index 1328a8695b0cca..e17e7d71b6ab46 100644 --- a/Lib/test/test_pathlib/test_pathlib.py +++ b/Lib/test/test_pathlib/test_pathlib.py @@ -16,6 +16,7 @@ from test.support import import_helper from test.support import is_emscripten, is_wasi from test.support import infinite_recursion +from test.support import swap_attr from test.support import os_helper from test.support.os_helper import TESTFN, FakePath from test.test_pathlib import test_pathlib_abc @@ -31,6 +32,10 @@ if hasattr(os, 'geteuid'): root_in_posix = (os.geteuid() == 0) +rmtree_use_fd_functions = ( + {os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd and + os.listdir in os.supports_fd and os.stat in os.supports_follow_symlinks) + # # Tests for the pure classes. # @@ -827,6 +832,252 @@ def test_group_no_follow_symlinks(self): self.assertEqual(expected_gid, gid_2) self.assertEqual(expected_name, link.group(follow_symlinks=False)) + def test_rmtree_uses_safe_fd_version_if_available(self): + if rmtree_use_fd_functions: + d = self.cls(self.base, 'a') + d.mkdir() + try: + real_open = os.open + + class Called(Exception): + pass + + def _raiser(*args, **kwargs): + raise Called + + os.open = _raiser + self.assertRaises(Called, d.rmtree) + finally: + os.open = real_open + + @unittest.skipIf(sys.platform[:6] == 'cygwin', + "This test can't be run on Cygwin (issue #1071513).") + @os_helper.skip_if_dac_override + @os_helper.skip_unless_working_chmod + def test_rmtree_unwritable(self): + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + child_file_path = tmp / 'a' + child_dir_path = tmp / 'b' + child_file_path.write_text("") + child_dir_path.mkdir() + old_dir_mode = tmp.stat().st_mode + old_child_file_mode = child_file_path.stat().st_mode + old_child_dir_mode = child_dir_path.stat().st_mode + # Make unwritable. + new_mode = stat.S_IREAD | stat.S_IEXEC + try: + child_file_path.chmod(new_mode) + child_dir_path.chmod(new_mode) + tmp.chmod(new_mode) + + errors = [] + tmp.rmtree(on_error=errors.append) + # Test whether onerror has actually been called. + print(errors) + self.assertEqual(len(errors), 3) + finally: + tmp.chmod(old_dir_mode) + child_file_path.chmod(old_child_file_mode) + child_dir_path.chmod(old_child_dir_mode) + + @needs_windows + def test_rmtree_inner_junction(self): + import _winapi + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + dir1 = tmp / 'dir1' + dir2 = dir1 / 'dir2' + dir3 = tmp / 'dir3' + for d in dir1, dir2, dir3: + d.mkdir() + file1 = tmp / 'file1' + file1.write_text('foo') + link1 = dir1 / 'link1' + _winapi.CreateJunction(str(dir2), str(link1)) + link2 = dir1 / 'link2' + _winapi.CreateJunction(str(dir3), str(link2)) + link3 = dir1 / 'link3' + _winapi.CreateJunction(str(file1), str(link3)) + # make sure junctions are removed but not followed + dir1.rmtree() + self.assertFalse(dir1.exists()) + self.assertTrue(dir3.exists()) + self.assertTrue(file1.exists()) + + @needs_windows + def test_rmtree_outer_junction(self): + import _winapi + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + try: + src = tmp / 'cheese' + dst = tmp / 'shop' + src.mkdir() + spam = src / 'spam' + spam.write_text('') + _winapi.CreateJunction(str(src), str(dst)) + self.assertRaises(OSError, dst.rmtree) + dst.rmtree(ignore_errors=True) + finally: + tmp.rmtree(ignore_errors=True) + + @needs_windows + def test_rmtree_outer_junction_on_error(self): + import _winapi + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + dir_ = tmp / 'dir' + dir_.mkdir() + link = tmp / 'link' + _winapi.CreateJunction(str(dir_), str(link)) + try: + self.assertRaises(OSError, link.rmtree) + self.assertTrue(dir_.exists()) + self.assertTrue(link.exists(follow_symlinks=False)) + errors = [] + + def on_error(error): + errors.append(error) + + link.rmtree(on_error=on_error) + self.assertEqual(len(errors), 1) + self.assertIsInstance(errors[0], OSError) + self.assertEqual(errors[0].filename, str(link)) + finally: + os.unlink(str(link)) + + @unittest.skipUnless(rmtree_use_fd_functions, "requires safe rmtree") + def test_rmtree_fails_on_close(self): + # Test that the error handler is called for failed os.close() and that + # os.close() is only called once for a file descriptor. + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + dir1 = tmp / 'dir1' + dir1.mkdir() + dir2 = dir1 / 'dir2' + dir2.mkdir() + + def close(fd): + orig_close(fd) + nonlocal close_count + close_count += 1 + raise OSError + + close_count = 0 + with swap_attr(os, 'close', close) as orig_close: + with self.assertRaises(OSError): + dir1.rmtree() + self.assertTrue(dir2.is_dir()) + self.assertEqual(close_count, 2) + + close_count = 0 + errors = [] + + with swap_attr(os, 'close', close) as orig_close: + dir1.rmtree(on_error=errors.append) + print(errors) + self.assertEqual(len(errors), 2) + self.assertEqual(errors[0].filename, str(dir2)) + self.assertEqual(errors[1].filename, str(dir1)) + self.assertEqual(close_count, 2) + + @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') + @unittest.skipIf(sys.platform == "vxworks", + "fifo requires special path on VxWorks") + def test_rmtree_on_named_pipe(self): + p = self.cls(self.base, 'pipe') + os.mkfifo(p) + try: + with self.assertRaises(NotADirectoryError): + p.rmtree() + self.assertTrue(p.exists()) + finally: + p.unlink() + + p = self.cls(self.base, 'dir') + p.mkdir() + os.mkfifo(p / 'mypipe') + p.rmtree() + self.assertFalse(p.exists()) + + @unittest.skipIf(sys.platform[:6] == 'cygwin', + "This test can't be run on Cygwin (issue #1071513).") + @os_helper.skip_if_dac_override + @os_helper.skip_unless_working_chmod + def test_rmtree_deleted_race_condition(self): + # bpo-37260 + # + # Test that a file or a directory deleted after it is enumerated + # by scandir() but before unlink() or rmdr() is called doesn't + # generate any errors. + def on_error(exc): + assert exc.filename + if not isinstance(exc, PermissionError): + raise + # Make the parent and the children writeable. + for p, mode in zip(paths, old_modes): + p.chmod(mode) + # Remove other dirs except one. + keep = next(p for p in dirs if str(p) != exc.filename) + for p in dirs: + if p != keep: + p.rmdir() + # Remove other files except one. + keep = next(p for p in files if str(p) != exc.filename) + for p in files: + if p != keep: + p.unlink() + + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + paths = [tmp] + [tmp / f'child{i}' for i in range(6)] + dirs = paths[1::2] + files = paths[2::2] + for path in dirs: + path.mkdir() + for path in files: + path.write_text('') + + old_modes = [path.stat().st_mode for path in paths] + + # Make the parent and the children non-writeable. + new_mode = stat.S_IREAD | stat.S_IEXEC + for path in reversed(paths): + path.chmod(new_mode) + + try: + tmp.rmtree(on_error=on_error) + except: + # Test failed, so cleanup artifacts. + for path, mode in zip(paths, old_modes): + try: + path.chmod(mode) + except OSError: + pass + tmp.rmtree() + raise + + def test_rmtree_does_not_choke_on_failing_lstat(self): + try: + orig_lstat = os.lstat + tmp = self.cls(self.base, 'rmtree') + + def raiser(fn, *args, **kwargs): + if fn != str(tmp): + raise OSError() + else: + return orig_lstat(fn) + + os.lstat = raiser + + tmp.mkdir() + foo = tmp / 'foo' + foo.write_text('') + tmp.rmtree() + finally: + os.lstat = orig_lstat + @os_helper.skip_unless_hardlink def test_hardlink_to(self): P = self.cls(self.base) diff --git a/Lib/test/test_pathlib/test_pathlib_abc.py b/Lib/test/test_pathlib/test_pathlib_abc.py index 28c9664cc90fe1..37678c5d799e9a 100644 --- a/Lib/test/test_pathlib/test_pathlib_abc.py +++ b/Lib/test/test_pathlib/test_pathlib_abc.py @@ -2641,6 +2641,105 @@ def test_rmdir(self): self.assertFileNotFound(p.stat) self.assertFileNotFound(p.unlink) + def test_rmtree(self): + base = self.cls(self.base) + base.joinpath('dirA').rmtree() + self.assertRaises(FileNotFoundError, base.joinpath('dirA').stat) + self.assertRaises(FileNotFoundError, base.joinpath('dirA', 'linkC').lstat) + base.joinpath('dirB').rmtree() + self.assertRaises(FileNotFoundError, base.joinpath('dirB').stat) + self.assertRaises(FileNotFoundError, base.joinpath('dirB', 'fileB').stat) + self.assertRaises(FileNotFoundError, base.joinpath('dirB', 'linkD').lstat) + base.joinpath('dirC').rmtree() + self.assertRaises(FileNotFoundError, base.joinpath('dirC').stat) + self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'dirD').stat) + self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'dirD', 'fileD').stat) + self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'fileC').stat) + self.assertRaises(FileNotFoundError, base.joinpath('dirC', 'novel.txt').stat) + + def test_rmtree_errors(self): + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + # filename is guaranteed not to exist + filename = tmp / 'foo' + self.assertRaises(FileNotFoundError, filename.rmtree) + # test that ignore_errors option is honored + filename.rmtree(ignore_errors=True) + + # existing file + filename = tmp / "tstfile" + filename.write_text("") + with self.assertRaises(NotADirectoryError) as cm: + filename.rmtree() + self.assertEqual(cm.exception.filename, str(filename)) + self.assertTrue(filename.exists()) + # test that ignore_errors option is honored + filename.rmtree(ignore_errors=True) + self.assertTrue(filename.exists()) + + def test_rmtree_on_error(self): + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + filename = tmp / "tstfile" + filename.write_text("") + errors = [] + + def on_error(error): + errors.append(error) + + filename.rmtree(on_error=on_error) + self.assertEqual(len(errors), 2) + # First from scandir() + self.assertIsInstance(errors[0], NotADirectoryError) + self.assertEqual(errors[0].filename, str(filename)) + # Then from munlink() + self.assertIsInstance(errors[1], NotADirectoryError) + self.assertEqual(errors[1].filename, str(filename)) + + @needs_symlinks + def test_rmtree_outer_symlink(self): + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + dir_ = tmp / 'dir' + dir_.mkdir() + link = tmp / 'link' + link.symlink_to(dir_) + self.assertRaises(OSError, link.rmtree) + self.assertTrue(dir_.exists()) + self.assertTrue(link.exists(follow_symlinks=False)) + errors = [] + + def on_error(error): + errors.append(error) + + link.rmtree(on_error=on_error) + self.assertEqual(len(errors), 1) + self.assertIsInstance(errors[0], OSError) + self.assertEqual(errors[0].filename, str(link)) + + @needs_symlinks + def test_rmtree_inner_symlink(self): + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + dir1 = tmp / 'dir1' + dir2 = dir1 / 'dir2' + dir3 = tmp / 'dir3' + for d in dir1, dir2, dir3: + d.mkdir() + file1 = tmp / 'file1' + file1.write_text('foo') + link1 = dir1 / 'link1' + link1.symlink_to(dir2) + link2 = dir1 / 'link2' + link2.symlink_to(dir3) + link3 = dir1 / 'link3' + link3.symlink_to(file1) + # make sure symlinks are removed but not followed + dir1.rmtree() + self.assertFalse(dir1.exists()) + self.assertTrue(dir3.exists()) + self.assertTrue(file1.exists()) + def setUpWalk(self): # Build: # TESTFN/ diff --git a/Misc/NEWS.d/next/Library/2024-05-15-01-21-44.gh-issue-73991.bNDqQN.rst b/Misc/NEWS.d/next/Library/2024-05-15-01-21-44.gh-issue-73991.bNDqQN.rst new file mode 100644 index 00000000000000..9aa7a7dba666af --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-05-15-01-21-44.gh-issue-73991.bNDqQN.rst @@ -0,0 +1 @@ +Add :meth:`pathlib.Path.rmtree`, which recursively removes a directory.