From 8e4ad875bedd6807a457d024342ebc5e5f10ad5c Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Wed, 7 Jun 2023 14:53:48 +0200 Subject: [PATCH] 00397: PEP 706, CVE-2007-4559: Filter API for tarfile.extractall Add API for allowing checks on the content of tar files, allowing callers to mitigate directory traversal (CVE-2007-4559) and related issues. Python 3.12 will warn if this API is not used. Python 3.14 will fail if it's not used. RHEL adds configuration options, by default it will warn and fail like 3.14 upstream. Backport from https://github.com/python/cpython/issues/102950 Change document: https://peps.python.org/pep-0706/ --- Doc/library/shutil.rst | 33 +- Doc/library/tarfile.rst | 447 ++++++++- Doc/whatsnew/3.6.rst | 17 +- Lib/shutil.py | 18 +- Lib/tarfile.py | 359 ++++++- Lib/test/support/__init__.py | 24 + Lib/test/test_shutil.py | 40 +- Lib/test/test_tarfile.py | 926 +++++++++++++++++- ...-03-23-15-24-38.gh-issue-102953.YR4KaK.rst | 4 + 9 files changed, 1790 insertions(+), 78 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2023-03-23-15-24-38.gh-issue-102953.YR4KaK.rst diff --git a/Doc/library/shutil.rst b/Doc/library/shutil.rst index c3f7bd664029cb..29ef5259cb27aa 100644 --- a/Doc/library/shutil.rst +++ b/Doc/library/shutil.rst @@ -537,7 +537,7 @@ provided. They rely on the :mod:`zipfile` and :mod:`tarfile` modules. Remove the archive format *name* from the list of supported formats. -.. function:: unpack_archive(filename[, extract_dir[, format]]) +.. function:: unpack_archive(filename[, extract_dir[, format[, filter]]]) Unpack an archive. *filename* is the full path of the archive. @@ -551,6 +551,24 @@ provided. They rely on the :mod:`zipfile` and :mod:`tarfile` modules. registered for that extension. In case none is found, a :exc:`ValueError` is raised. + The keyword-only *filter* argument, which was added in Python 3.6.16, + is passed to the underlying unpacking function. + For zip files, *filter* is not accepted. + For tar files, it is recommended to set it to ``'data'``, + unless using features specific to tar and UNIX-like filesystems. + (See :ref:`tarfile-extraction-filter` for details.) + The ``'data'`` filter will become the default for tar files + in Python 3.14. + + .. warning:: + + Never extract archives from untrusted sources without prior inspection. + It is possible that files are created outside of the path specified in + the *extract_dir* argument, e.g. members that have absolute filenames + starting with "/" or filenames with two dots "..". + + .. versionchanged:: 3.6.16 + Added the *filter* argument. .. function:: register_unpack_format(name, extensions, function[, extra_args[, description]]) @@ -559,11 +577,14 @@ provided. They rely on the :mod:`zipfile` and :mod:`tarfile` modules. ``.zip`` for Zip files. *function* is the callable that will be used to unpack archives. The - callable will receive the path of the archive, followed by the directory - the archive must be extracted to. - - When provided, *extra_args* is a sequence of ``(name, value)`` tuples that - will be passed as keywords arguments to the callable. + callable will receive: + + - the path of the archive, as a positional argument; + - the directory the archive must be extracted to, as a positional argument; + - possibly a *filter* keyword argument, if it was given to + :func:`unpack_archive`; + - additional keyword arguments, specified by *extra_args* as a sequence + of ``(name, value)`` tuples. *description* can be provided to describe the format, and will be returned by the :func:`get_unpack_formats` function. diff --git a/Doc/library/tarfile.rst b/Doc/library/tarfile.rst index 337c061107299c..f6dc9a703820a3 100644 --- a/Doc/library/tarfile.rst +++ b/Doc/library/tarfile.rst @@ -199,6 +199,38 @@ The :mod:`tarfile` module defines the following exceptions: Is raised by :meth:`TarInfo.frombuf` if the buffer it gets is invalid. +.. exception:: FilterError + + Base class for members :ref:`refused ` by + filters. + + .. attribute:: tarinfo + + Information about the member that the filter refused to extract, + as :ref:`TarInfo `. + +.. exception:: AbsolutePathError + + Raised to refuse extracting a member with an absolute path. + +.. exception:: OutsideDestinationError + + Raised to refuse extracting a member outside the destination directory. + +.. exception:: SpecialFileError + + Raised to refuse extracting a special file (e.g. a device or pipe). + +.. exception:: AbsoluteLinkError + + Raised to refuse extracting a symbolic link with an absolute path. + +.. exception:: LinkOutsideDestinationError + + Raised to refuse extracting a symbolic link pointing outside the destination + directory. + + The following constants are available at the module level: .. data:: ENCODING @@ -304,11 +336,8 @@ be finalized; only the internally used file object will be closed. See the *debug* can be set from ``0`` (no debug messages) up to ``3`` (all debug messages). The messages are written to ``sys.stderr``. - If *errorlevel* is ``0``, all errors are ignored when using :meth:`TarFile.extract`. - Nevertheless, they appear as error messages in the debug output, when debugging - is enabled. If ``1``, all *fatal* errors are raised as :exc:`OSError` - exceptions. If ``2``, all *non-fatal* errors are raised as :exc:`TarError` - exceptions as well. + *errorlevel* controls how extraction errors are handled, + see :attr:`the corresponding attribute <~TarFile.errorlevel>`. The *encoding* and *errors* arguments define the character encoding to be used for reading or writing the archive and how conversion errors are going @@ -375,7 +404,7 @@ be finalized; only the internally used file object will be closed. See the available. -.. method:: TarFile.extractall(path=".", members=None, *, numeric_owner=False) +.. method:: TarFile.extractall(path=".", members=None, *, numeric_owner=False, filter=None) Extract all members from the archive to the current working directory or directory *path*. If optional *members* is given, it must be a subset of the @@ -389,6 +418,12 @@ be finalized; only the internally used file object will be closed. See the are used to set the owner/group for the extracted files. Otherwise, the named values from the tarfile are used. + The *filter* argument, which was added in Python 3.6.16, specifies how + ``members`` are modified or rejected before extraction. + See :ref:`tarfile-extraction-filter` for details. + It is recommended to set this explicitly depending on which *tar* features + you need to support. + .. warning:: Never extract archives from untrusted sources without prior inspection. @@ -396,14 +431,20 @@ be finalized; only the internally used file object will be closed. See the that have absolute filenames starting with ``"/"`` or filenames with two dots ``".."``. + Set ``filter='data'`` to prevent the most dangerous security issues, + and read the :ref:`tarfile-extraction-filter` section for details. + .. versionchanged:: 3.5 Added the *numeric_owner* parameter. .. versionchanged:: 3.6 The *path* parameter accepts a :term:`path-like object`. + .. versionchanged:: 3.6.16 + Added the *filter* parameter. + -.. method:: TarFile.extract(member, path="", set_attrs=True, *, numeric_owner=False) +.. method:: TarFile.extract(member, path="", set_attrs=True, *, numeric_owner=False, filter=None) Extract a member from the archive to the current working directory, using its full name. Its file information is extracted as accurately as possible. *member* @@ -411,9 +452,8 @@ be finalized; only the internally used file object will be closed. See the directory using *path*. *path* may be a :term:`path-like object`. File attributes (owner, mtime, mode) are set unless *set_attrs* is false. - If *numeric_owner* is :const:`True`, the uid and gid numbers from the tarfile - are used to set the owner/group for the extracted files. Otherwise, the named - values from the tarfile are used. + The *numeric_owner* and *filter* arguments are the same as + for :meth:`extractall`. .. note:: @@ -424,6 +464,9 @@ be finalized; only the internally used file object will be closed. See the See the warning for :meth:`extractall`. + Set ``filter='data'`` to prevent the most dangerous security issues, + and read the :ref:`tarfile-extraction-filter` section for details. + .. versionchanged:: 3.2 Added the *set_attrs* parameter. @@ -433,6 +476,9 @@ be finalized; only the internally used file object will be closed. See the .. versionchanged:: 3.6 The *path* parameter accepts a :term:`path-like object`. + .. versionchanged:: 3.6.16 + Added the *filter* parameter. + .. method:: TarFile.extractfile(member) @@ -444,6 +490,56 @@ be finalized; only the internally used file object will be closed. See the .. versionchanged:: 3.3 Return an :class:`io.BufferedReader` object. +.. attribute:: TarFile.errorlevel + + If *errorlevel* is ``0``, errors are ignored when using :meth:`TarFile.extract` + and :meth:`TarFile.extractall`. + Nevertheless, they appear as error messages in the debug output when + *debug* is greater than 0. + If ``1`` (the default), all *fatal* errors are raised as :exc:`OSError` or + :exc:`FilterError` exceptions. If ``2``, all *non-fatal* errors are raised + as :exc:`TarError` exceptions as well. + + Some exceptions, e.g. ones caused by wrong argument types or data + corruption, are always raised. + + Custom :ref:`extraction filters ` + should raise :exc:`FilterError` for *fatal* errors + and :exc:`ExtractError` for *non-fatal* ones. + + Note that when an exception is raised, the archive may be partially + extracted. It is the user’s responsibility to clean up. + +.. attribute:: TarFile.extraction_filter + + .. versionadded:: 3.6.16 + + The :ref:`extraction filter ` used + as a default for the *filter* argument of :meth:`~TarFile.extract` + and :meth:`~TarFile.extractall`. + + The attribute may be ``None`` or a callable. + String names are not allowed for this attribute, unlike the *filter* + argument to :meth:`~TarFile.extract`. + + If ``extraction_filter`` is ``None`` (the default), + calling an extraction method without a *filter* argument will + use the :func:`fully_trusted ` filter for + compatibility with previous Python versions. + + In Python 3.12+, leaving ``extraction_filter=None`` will emit a + ``DeprecationWarning``. + + In Python 3.14+, leaving ``extraction_filter=None`` will cause + extraction methods to use the :func:`data ` filter by default. + + The attribute may be set on instances or overridden in subclasses. + It also is possible to set it on the ``TarFile`` class itself to set a + global default, although, since it affects all uses of *tarfile*, + it is best practice to only do so in top-level applications or + :mod:`site configuration `. + To set a global default this way, a filter function needs to be wrapped in + :func:`staticmethod()` to prevent injection of a ``self`` argument. .. method:: TarFile.add(name, arcname=None, recursive=True, exclude=None, *, filter=None) @@ -522,7 +618,27 @@ permissions, owner etc.), it provides some useful methods to determine its type. It does *not* contain the file's data itself. :class:`TarInfo` objects are returned by :class:`TarFile`'s methods -:meth:`getmember`, :meth:`getmembers` and :meth:`gettarinfo`. +:meth:`~TarFile.getmember`, :meth:`~TarFile.getmembers` and +:meth:`~TarFile.gettarinfo`. + +Modifying the objects returned by :meth:`~!TarFile.getmember` or +:meth:`~!TarFile.getmembers` will affect all subsequent +operations on the archive. +For cases where this is unwanted, you can use :mod:`copy.copy() ` or +call the :meth:`~TarInfo.replace` method to create a modified copy in one step. + +Several attributes can be set to ``None`` to indicate that a piece of metadata +is unused or unknown. +Different :class:`TarInfo` methods handle ``None`` differently: + +- The :meth:`~TarFile.extract` or :meth:`~TarFile.extractall` methods will + ignore the corresponding metadata, leaving it set to a default. +- :meth:`~TarFile.addfile` will fail. +- :meth:`~TarFile.list` will print a placeholder string. + + +.. versionchanged:: 3.6.16 + Added :meth:`~TarInfo.replace` and handling of ``None``. .. class:: TarInfo(name="") @@ -567,13 +683,24 @@ A ``TarInfo`` object has the following public data attributes: .. attribute:: TarInfo.mtime - Time of last modification. + Time of last modification in seconds since the :ref:`epoch `, + as in :attr:`os.stat_result.st_mtime`. + + .. versionchanged:: 3.6.16 + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`, causing extraction to skip applying this + attribute. .. attribute:: TarInfo.mode - Permission bits. + Permission bits, as for :func:`os.chmod`. + .. versionchanged:: 3.6.16 + + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`, causing extraction to skip applying this + attribute. .. attribute:: TarInfo.type @@ -594,26 +721,61 @@ A ``TarInfo`` object has the following public data attributes: User ID of the user who originally stored this member. + .. versionchanged:: 3.6.16 + + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`, causing extraction to skip applying this + attribute. .. attribute:: TarInfo.gid Group ID of the user who originally stored this member. + .. versionchanged:: 3.6.16 + + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`, causing extraction to skip applying this + attribute. .. attribute:: TarInfo.uname User name. + .. versionchanged:: 3.6.16 + + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`, causing extraction to skip applying this + attribute. .. attribute:: TarInfo.gname Group name. + .. versionchanged:: 3.6.16 + + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`, causing extraction to skip applying this + attribute. .. attribute:: TarInfo.pax_headers A dictionary containing key-value pairs of an associated pax extended header. +.. method:: TarInfo.replace(name=..., mtime=..., mode=..., linkname=..., + uid=..., gid=..., uname=..., gname=..., + deep=True) + + .. versionadded:: 3.6.16 + + Return a *new* copy of the :class:`!TarInfo` object with the given attributes + changed. For example, to return a ``TarInfo`` with the group name set to + ``'staff'``, use:: + + new_tarinfo = old_tarinfo.replace(gname='staff') + + By default, a deep copy is made. + If *deep* is false, the copy is shallow, i.e. ``pax_headers`` + and any custom attributes are shared with the original ``TarInfo`` object. A :class:`TarInfo` object also provides some convenient query methods: @@ -663,9 +825,259 @@ A :class:`TarInfo` object also provides some convenient query methods: Return :const:`True` if it is one of character device, block device or FIFO. +.. _tarfile-extraction-filter: + +Extraction filters +------------------ + +.. versionadded:: 3.6.16 + +The *tar* format is designed to capture all details of a UNIX-like filesystem, +which makes it very powerful. +Unfortunately, the features make it easy to create tar files that have +unintended -- and possibly malicious -- effects when extracted. +For example, extracting a tar file can overwrite arbitrary files in various +ways (e.g. by using absolute paths, ``..`` path components, or symlinks that +affect later members). + +In most cases, the full functionality is not needed. +Therefore, *tarfile* supports extraction filters: a mechanism to limit +functionality, and thus mitigate some of the security issues. + +.. seealso:: + + :pep:`706` + Contains further motivation and rationale behind the design. + +The *filter* argument to :meth:`TarFile.extract` or :meth:`~TarFile.extractall` +can be: + +* the string ``'fully_trusted'``: Honor all metadata as specified in the + archive. + Should be used if the user trusts the archive completely, or implements + their own complex verification. + +* the string ``'tar'``: Honor most *tar*-specific features (i.e. features of + UNIX-like filesystems), but block features that are very likely to be + surprising or malicious. See :func:`tar_filter` for details. + +* the string ``'data'``: Ignore or block most features specific to UNIX-like + filesystems. Intended for extracting cross-platform data archives. + See :func:`data_filter` for details. + +* ``None`` (default): Use :attr:`TarFile.extraction_filter`. + + If that is also ``None`` (the default), the ``'fully_trusted'`` + filter will be used (for compatibility with earlier versions of Python). + + In Python 3.12, the default will emit a ``DeprecationWarning``. + + In Python 3.14, the ``'data'`` filter will become the default instead. + It's possible to switch earlier; see :attr:`TarFile.extraction_filter`. + +* A callable which will be called for each extracted member with a + :ref:`TarInfo ` describing the member and the destination + path to where the archive is extracted (i.e. the same path is used for all + members):: + + filter(/, member: TarInfo, path: str) -> TarInfo | None + + The callable is called just before each member is extracted, so it can + take the current state of the disk into account. + It can: + + - return a :class:`TarInfo` object which will be used instead of the metadata + in the archive, or + - return ``None``, in which case the member will be skipped, or + - raise an exception to abort the operation or skip the member, + depending on :attr:`~TarFile.errorlevel`. + Note that when extraction is aborted, :meth:`~TarFile.extractall` may leave + the archive partially extracted. It does not attempt to clean up. + +Default named filters +~~~~~~~~~~~~~~~~~~~~~ + +The pre-defined, named filters are available as functions, so they can be +reused in custom filters: + +.. function:: fully_trusted_filter(/, member, path) + + Return *member* unchanged. + + This implements the ``'fully_trusted'`` filter. + +.. function:: tar_filter(/, member, path) + + Implements the ``'tar'`` filter. + + - Strip leading slashes (``/`` and :attr:`os.sep`) from filenames. + - :ref:`Refuse ` to extract files with absolute + paths (in case the name is absolute + even after stripping slashes, e.g. ``C:/foo`` on Windows). + This raises :class:`~tarfile.AbsolutePathError`. + - :ref:`Refuse ` to extract files whose absolute + path (after following symlinks) would end up outside the destination. + This raises :class:`~tarfile.OutsideDestinationError`. + - Clear high mode bits (setuid, setgid, sticky) and group/other write bits + (:attr:`~stat.S_IWGRP`|:attr:`~stat.S_IWOTH`). + + Return the modified ``TarInfo`` member. + +.. function:: data_filter(/, member, path) + + Implements the ``'data'`` filter. + In addition to what ``tar_filter`` does: + + - :ref:`Refuse ` to extract links (hard or soft) + that link to absolute paths, or ones that link outside the destination. + + This raises :class:`~tarfile.AbsoluteLinkError` or + :class:`~tarfile.LinkOutsideDestinationError`. + + Note that such files are refused even on platforms that do not support + symbolic links. + + - :ref:`Refuse ` to extract device files + (including pipes). + This raises :class:`~tarfile.SpecialFileError`. + + - For regular files, including hard links: + + - Set the owner read and write permissions + (:attr:`~stat.S_IRUSR`|:attr:`~stat.S_IWUSR`). + - Remove the group & other executable permission + (:attr:`~stat.S_IXGRP`|:attr:`~stat.S_IXOTH`) + if the owner doesn’t have it (:attr:`~stat.S_IXUSR`). + + - For other files (directories), set ``mode`` to ``None``, so + that extraction methods skip applying permission bits. + - Set user and group info (``uid``, ``gid``, ``uname``, ``gname``) + to ``None``, so that extraction methods skip setting it. + + Return the modified ``TarInfo`` member. + + +.. _tarfile-extraction-refuse: + +Filter errors +~~~~~~~~~~~~~ + +When a filter refuses to extract a file, it will raise an appropriate exception, +a subclass of :class:`~tarfile.FilterError`. +This will abort the extraction if :attr:`TarFile.errorlevel` is 1 or more. +With ``errorlevel=0`` the error will be logged and the member will be skipped, +but extraction will continue. + + +Hints for further verification +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Even with ``filter='data'``, *tarfile* is not suited for extracting untrusted +files without prior inspection. +Among other issues, the pre-defined filters do not prevent denial-of-service +attacks. Users should do additional checks. + +Here is an incomplete list of things to consider: + +* Extract to a :func:`new temporary directory ` + to prevent e.g. exploiting pre-existing links, and to make it easier to + clean up after a failed extraction. +* When working with untrusted data, use external (e.g. OS-level) limits on + disk, memory and CPU usage. +* Check filenames against an allow-list of characters + (to filter out control characters, confusables, foreign path separators, + etc.). +* Check that filenames have expected extensions (discouraging files that + execute when you “click on them”, or extension-less files like Windows special device names). +* Limit the number of extracted files, total size of extracted data, + filename length (including symlink length), and size of individual files. +* Check for files that would be shadowed on case-insensitive filesystems. + +Also note that: + +* Tar files may contain multiple versions of the same file. + Later ones are expected to overwrite any earlier ones. + This feature is crucial to allow updating tape archives, but can be abused + maliciously. +* *tarfile* does not protect against issues with “live” data, + e.g. an attacker tinkering with the destination (or source) directory while + extraction (or archiving) is in progress. + + +Supporting older Python versions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Extraction filters were added to Python 3.12, and are backported to older +versions as security updates. +To check whether the feature is available, use e.g. +``hasattr(tarfile, 'data_filter')`` rather than checking the Python version. + +The following examples show how to support Python versions with and without +the feature. +Note that setting ``extraction_filter`` will affect any subsequent operations. + +* Fully trusted archive:: + + my_tarfile.extraction_filter = (lambda member, path: member) + my_tarfile.extractall() + +* Use the ``'data'`` filter if available, but revert to Python 3.11 behavior + (``'fully_trusted'``) if this feature is not available:: + + my_tarfile.extraction_filter = getattr(tarfile, 'data_filter', + (lambda member, path: member)) + my_tarfile.extractall() + +* Use the ``'data'`` filter; *fail* if it is not available:: + + my_tarfile.extractall(filter=tarfile.data_filter) + + or:: + + my_tarfile.extraction_filter = tarfile.data_filter + my_tarfile.extractall() + +* Use the ``'data'`` filter; *warn* if it is not available:: + + if hasattr(tarfile, 'data_filter'): + my_tarfile.extractall(filter='data') + else: + # remove this when no longer needed + warn_the_user('Extracting may be unsafe; consider updating Python') + my_tarfile.extractall() + + +Stateful extraction filter example +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +While *tarfile*'s extraction methods take a simple *filter* callable, +custom filters may be more complex objects with an internal state. +It may be useful to write these as context managers, to be used like this:: + + with StatefulFilter() as filter_func: + tar.extractall(path, filter=filter_func) + +Such a filter can be written as, for example:: + + class StatefulFilter: + def __init__(self): + self.file_count = 0 + + def __enter__(self): + return self + + def __call__(self, member, path): + self.file_count += 1 + return member + + def __exit__(self, *exc_info): + print(f'{self.file_count} files extracted') + + .. _tarfile-commandline: .. program:: tarfile + Command-Line Interface ---------------------- @@ -735,6 +1147,15 @@ Command-line options Verbose output. +.. cmdoption:: --filter + + Specifies the *filter* for ``--extract``. + See :ref:`tarfile-extraction-filter` for details. + Only string names are accepted (that is, ``fully_trusted``, ``tar``, + and ``data``). + + .. versionadded:: 3.6.16 + .. _tar-examples: Examples diff --git a/Doc/whatsnew/3.6.rst b/Doc/whatsnew/3.6.rst index 19811f4a81935c..7b018dfed78a33 100644 --- a/Doc/whatsnew/3.6.rst +++ b/Doc/whatsnew/3.6.rst @@ -2489,7 +2489,6 @@ URL by the parser :func:`urllib.parse` preventing such attacks. The removal characters are controlled by a new module level variable ``urllib.parse._UNSAFE_URL_BYTES_TO_REMOVE``. (See :issue:`43882`) - Notable security feature in 3.6.15-13 ===================================== @@ -2503,3 +2502,19 @@ This limit can be configured or disabled by environment variable, command line flag, or :mod:`sys` APIs. See the :ref:`integer string conversion length limitation ` documentation. The default limit is 4300 digits in string form. + +Notable Changes in a backport +============================= + +tarfile +------- + +* The extraction methods in :mod:`tarfile`, and :func:`shutil.unpack_archive`, + have a new a *filter* argument that allows limiting tar features than may be + surprising or dangerous, such as creating files outside the destination + directory. + See :ref:`tarfile-extraction-filter` for details. + In Python 3.12, use without the *filter* argument will show a + :exc:`DeprecationWarning`. + In Python 3.14, the default will switch to ``'data'``. + (Contributed by Petr Viktorin in :pep:`706`.) diff --git a/Lib/shutil.py b/Lib/shutil.py index dd124484547caa..d687de1bbf6910 100644 --- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -908,7 +908,7 @@ def _unpack_zipfile(filename, extract_dir): finally: zip.close() -def _unpack_tarfile(filename, extract_dir): +def _unpack_tarfile(filename, extract_dir, *, filter=None): """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` """ import tarfile # late import for breaking circular dependency @@ -918,7 +918,7 @@ def _unpack_tarfile(filename, extract_dir): raise ReadError( "%s is not a compressed or uncompressed tar file" % filename) try: - tarobj.extractall(extract_dir) + tarobj.extractall(extract_dir, filter=filter) finally: tarobj.close() @@ -946,7 +946,7 @@ def _find_unpack_format(filename): return name return None -def unpack_archive(filename, extract_dir=None, format=None): +def unpack_archive(filename, extract_dir=None, format=None, *, filter=None): """Unpack an archive. `filename` is the name of the archive. @@ -960,10 +960,19 @@ def unpack_archive(filename, extract_dir=None, format=None): was registered for that extension. In case none is found, a ValueError is raised. + + If `filter` is given, it is passed to the underlying + extraction function. """ if extract_dir is None: extract_dir = os.getcwd() + + if filter is None: + filter_kwargs = {} + else: + filter_kwargs = {'filter': filter} + if format is not None: try: format_info = _UNPACK_FORMATS[format] @@ -971,7 +980,7 @@ def unpack_archive(filename, extract_dir=None, format=None): raise ValueError("Unknown unpack format '{0}'".format(format)) func = format_info[1] - func(filename, extract_dir, **dict(format_info[2])) + func(filename, extract_dir, **dict(format_info[2]), **filter_kwargs) else: # we need to look at the registered unpackers supported extensions format = _find_unpack_format(filename) @@ -980,6 +989,7 @@ def unpack_archive(filename, extract_dir=None, format=None): func = _UNPACK_FORMATS[format][1] kwargs = dict(_UNPACK_FORMATS[format][2]) + kwargs.update(filter_kwargs) func(filename, extract_dir, **kwargs) diff --git a/Lib/tarfile.py b/Lib/tarfile.py index 2ea47978ff6f1c..c18590325a87c7 100755 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -48,6 +48,7 @@ import struct import copy import re +import warnings try: import pwd @@ -73,6 +74,7 @@ "ENCODING", "USTAR_FORMAT", "GNU_FORMAT", "PAX_FORMAT", "DEFAULT_FORMAT", "open"] + #--------------------------------------------------------- # tar constants #--------------------------------------------------------- @@ -160,6 +162,8 @@ def stn(s, length, encoding, errors): """Convert a string to a null-terminated bytes object. """ + if s is None: + raise ValueError("metadata cannot contain None") s = s.encode(encoding, errors) return s[:length] + (length - len(s)) * NUL @@ -721,9 +725,127 @@ def __init__(self, tarfile, tarinfo): super().__init__(fileobj) #class ExFileObject + +#----------------------------- +# extraction filters (PEP 706) +#----------------------------- + +class FilterError(TarError): + pass + +class AbsolutePathError(FilterError): + def __init__(self, tarinfo): + self.tarinfo = tarinfo + super().__init__(f'member {tarinfo.name!r} has an absolute path') + +class OutsideDestinationError(FilterError): + def __init__(self, tarinfo, path): + self.tarinfo = tarinfo + self._path = path + super().__init__(f'{tarinfo.name!r} would be extracted to {path!r}, ' + + 'which is outside the destination') + +class SpecialFileError(FilterError): + def __init__(self, tarinfo): + self.tarinfo = tarinfo + super().__init__(f'{tarinfo.name!r} is a special file') + +class AbsoluteLinkError(FilterError): + def __init__(self, tarinfo): + self.tarinfo = tarinfo + super().__init__(f'{tarinfo.name!r} is a symlink to an absolute path') + +class LinkOutsideDestinationError(FilterError): + def __init__(self, tarinfo, path): + self.tarinfo = tarinfo + self._path = path + super().__init__(f'{tarinfo.name!r} would link to {path!r}, ' + + 'which is outside the destination') + +def _get_filtered_attrs(member, dest_path, for_data=True): + new_attrs = {} + name = member.name + dest_path = os.path.realpath(dest_path) + # Strip leading / (tar's directory separator) from filenames. + # Include os.sep (target OS directory separator) as well. + if name.startswith(('/', os.sep)): + name = new_attrs['name'] = member.path.lstrip('/' + os.sep) + if os.path.isabs(name): + # Path is absolute even after stripping. + # For example, 'C:/foo' on Windows. + raise AbsolutePathError(member) + # Ensure we stay in the destination + target_path = os.path.realpath(os.path.join(dest_path, name)) + if os.path.commonpath([target_path, dest_path]) != dest_path: + raise OutsideDestinationError(member, target_path) + # Limit permissions (no high bits, and go-w) + mode = member.mode + if mode is not None: + # Strip high bits & group/other write bits + mode = mode & 0o755 + if for_data: + # For data, handle permissions & file types + if member.isreg() or member.islnk(): + if not mode & 0o100: + # Clear executable bits if not executable by user + mode &= ~0o111 + # Ensure owner can read & write + mode |= 0o600 + elif member.isdir() or member.issym(): + # Ignore mode for directories & symlinks + mode = None + else: + # Reject special files + raise SpecialFileError(member) + if mode != member.mode: + new_attrs['mode'] = mode + if for_data: + # Ignore ownership for 'data' + if member.uid is not None: + new_attrs['uid'] = None + if member.gid is not None: + new_attrs['gid'] = None + if member.uname is not None: + new_attrs['uname'] = None + if member.gname is not None: + new_attrs['gname'] = None + # Check link destination for 'data' + if member.islnk() or member.issym(): + if os.path.isabs(member.linkname): + raise AbsoluteLinkError(member) + target_path = os.path.realpath(os.path.join(dest_path, member.linkname)) + if os.path.commonpath([target_path, dest_path]) != dest_path: + raise LinkOutsideDestinationError(member, target_path) + return new_attrs + +def fully_trusted_filter(member, dest_path): + return member + +def tar_filter(member, dest_path): + new_attrs = _get_filtered_attrs(member, dest_path, False) + if new_attrs: + return member.replace(**new_attrs, deep=False) + return member + +def data_filter(member, dest_path): + new_attrs = _get_filtered_attrs(member, dest_path, True) + if new_attrs: + return member.replace(**new_attrs, deep=False) + return member + +_NAMED_FILTERS = { + "fully_trusted": fully_trusted_filter, + "tar": tar_filter, + "data": data_filter, +} + #------------------ # Exported Classes #------------------ + +# Sentinel for replace() defaults, meaning "don't change the attribute" +_KEEP = object() + class TarInfo(object): """Informational class which holds the details about an archive member given by a tar header block. @@ -779,12 +901,44 @@ def _setlinkpath(self, linkname): def __repr__(self): return "<%s %r at %#x>" % (self.__class__.__name__,self.name,id(self)) + def replace(self, *, + name=_KEEP, mtime=_KEEP, mode=_KEEP, linkname=_KEEP, + uid=_KEEP, gid=_KEEP, uname=_KEEP, gname=_KEEP, + deep=True, _KEEP=_KEEP): + """Return a deep copy of self with the given attributes replaced. + """ + if deep: + result = copy.deepcopy(self) + else: + result = copy.copy(self) + if name is not _KEEP: + result.name = name + if mtime is not _KEEP: + result.mtime = mtime + if mode is not _KEEP: + result.mode = mode + if linkname is not _KEEP: + result.linkname = linkname + if uid is not _KEEP: + result.uid = uid + if gid is not _KEEP: + result.gid = gid + if uname is not _KEEP: + result.uname = uname + if gname is not _KEEP: + result.gname = gname + return result + def get_info(self): """Return the TarInfo's attributes as a dictionary. """ + if self.mode is None: + mode = None + else: + mode = self.mode & 0o7777 info = { "name": self.name, - "mode": self.mode & 0o7777, + "mode": mode, "uid": self.uid, "gid": self.gid, "size": self.size, @@ -807,6 +961,9 @@ def tobuf(self, format=DEFAULT_FORMAT, encoding=ENCODING, errors="surrogateescap """Return a tar header as a string of 512 byte blocks. """ info = self.get_info() + for name, value in info.items(): + if value is None: + raise ValueError("%s may not be None" % name) if format == USTAR_FORMAT: return self.create_ustar_header(info, encoding, errors) @@ -920,6 +1077,20 @@ def _create_header(info, format, encoding, errors): """Return a header block. info is a dictionary with file information, format must be one of the *_FORMAT constants. """ + has_device_fields = info.get("type") in (CHRTYPE, BLKTYPE) + if has_device_fields: + devmajor = itn(info.get("devmajor", 0), 8, format) + devminor = itn(info.get("devminor", 0), 8, format) + else: + devmajor = stn("", 8, encoding, errors) + devminor = stn("", 8, encoding, errors) + + # None values in metadata should cause ValueError. + # itn()/stn() do this for all fields except type. + filetype = info.get("type", REGTYPE) + if filetype is None: + raise ValueError("TarInfo.type must not be None") + parts = [ stn(info.get("name", ""), 100, encoding, errors), itn(info.get("mode", 0) & 0o7777, 8, format), @@ -928,7 +1099,7 @@ def _create_header(info, format, encoding, errors): itn(info.get("size", 0), 12, format), itn(info.get("mtime", 0), 12, format), b" ", # checksum field - info.get("type", REGTYPE), + filetype, stn(info.get("linkname", ""), 100, encoding, errors), info.get("magic", POSIX_MAGIC), stn(info.get("uname", ""), 32, encoding, errors), @@ -1410,6 +1581,8 @@ class TarFile(object): fileobject = ExFileObject # The file-object for extractfile(). + extraction_filter = None # The default filter for extraction. + def __init__(self, name=None, mode="r", fileobj=None, format=None, tarinfo=None, dereference=None, ignore_zeros=None, encoding=None, errors="surrogateescape", pax_headers=None, debug=None, @@ -1882,7 +2055,10 @@ def list(self, verbose=True, *, members=None): members = self for tarinfo in members: if verbose: - _safe_print(stat.filemode(tarinfo.mode)) + if tarinfo.mode is None: + _safe_print("??????????") + else: + _safe_print(stat.filemode(tarinfo.mode)) _safe_print("%s/%s" % (tarinfo.uname or tarinfo.uid, tarinfo.gname or tarinfo.gid)) if tarinfo.ischr() or tarinfo.isblk(): @@ -1890,8 +2066,11 @@ def list(self, verbose=True, *, members=None): ("%d,%d" % (tarinfo.devmajor, tarinfo.devminor))) else: _safe_print("%10d" % tarinfo.size) - _safe_print("%d-%02d-%02d %02d:%02d:%02d" \ - % time.localtime(tarinfo.mtime)[:6]) + if tarinfo.mtime is None: + _safe_print("????-??-?? ??:??:??") + else: + _safe_print("%d-%02d-%02d %02d:%02d:%02d" \ + % time.localtime(tarinfo.mtime)[:6]) _safe_print(tarinfo.name + ("/" if tarinfo.isdir() else "")) @@ -1988,32 +2167,58 @@ def addfile(self, tarinfo, fileobj=None): self.members.append(tarinfo) - def extractall(self, path=".", members=None, *, numeric_owner=False): + def _get_filter_function(self, filter): + if filter is None: + filter = self.extraction_filter + if filter is None: + return fully_trusted_filter + if isinstance(filter, str): + raise TypeError( + 'String names are not supported for ' + + 'TarFile.extraction_filter. Use a function such as ' + + 'tarfile.data_filter directly.') + return filter + if callable(filter): + return filter + try: + return _NAMED_FILTERS[filter] + except KeyError: + raise ValueError(f"filter {filter!r} not found") from None + + def extractall(self, path=".", members=None, *, numeric_owner=False, + filter=None): """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. `path' specifies a different directory to extract to. `members' is optional and must be a subset of the list returned by getmembers(). If `numeric_owner` is True, only the numbers for user/group names are used and not the names. + + The `filter` function will be called on each member just + before extraction. + It can return a changed TarInfo or None to skip the member. + String names of common filters are accepted. """ directories = [] + filter_function = self._get_filter_function(filter) if members is None: members = self - for tarinfo in members: + for member in members: + tarinfo = self._get_extract_tarinfo(member, filter_function, path) + if tarinfo is None: + continue if tarinfo.isdir(): - # Extract directories with a safe mode. + # For directories, delay setting attributes until later, + # since permissions can interfere with extraction and + # extracting contents can reset mtime. directories.append(tarinfo) - tarinfo = copy.copy(tarinfo) - tarinfo.mode = 0o700 - # Do not set_attrs directories, as we will do that further down - self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(), - numeric_owner=numeric_owner) + self._extract_one(tarinfo, path, set_attrs=not tarinfo.isdir(), + numeric_owner=numeric_owner) # Reverse sort directories. - directories.sort(key=lambda a: a.name) - directories.reverse() + directories.sort(key=lambda a: a.name, reverse=True) # Set correct owner, mtime and filemode on directories. for tarinfo in directories: @@ -2023,12 +2228,10 @@ def extractall(self, path=".", members=None, *, numeric_owner=False): self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError as e: - if self.errorlevel > 1: - raise - else: - self._dbg(1, "tarfile: %s" % e) + self._handle_nonfatal_error(e) - def extract(self, member, path="", set_attrs=True, *, numeric_owner=False): + def extract(self, member, path="", set_attrs=True, *, numeric_owner=False, + filter=None): """Extract a member from the archive to the current working directory, using its full name. Its file information is extracted as accurately as possible. `member' may be a filename or a TarInfo object. You can @@ -2036,35 +2239,70 @@ def extract(self, member, path="", set_attrs=True, *, numeric_owner=False): mtime, mode) are set unless `set_attrs' is False. If `numeric_owner` is True, only the numbers for user/group names are used and not the names. + + The `filter` function will be called before extraction. + It can return a changed TarInfo or None to skip the member. + String names of common filters are accepted. """ - self._check("r") + filter_function = self._get_filter_function(filter) + tarinfo = self._get_extract_tarinfo(member, filter_function, path) + if tarinfo is not None: + self._extract_one(tarinfo, path, set_attrs, numeric_owner) + def _get_extract_tarinfo(self, member, filter_function, path): + """Get filtered TarInfo (or None) from member, which might be a str""" if isinstance(member, str): tarinfo = self.getmember(member) else: tarinfo = member + unfiltered = tarinfo + try: + tarinfo = filter_function(tarinfo, path) + except (OSError, FilterError) as e: + self._handle_fatal_error(e) + except ExtractError as e: + self._handle_nonfatal_error(e) + if tarinfo is None: + self._dbg(2, "tarfile: Excluded %r" % unfiltered.name) + return None # Prepare the link target for makelink(). if tarinfo.islnk(): + tarinfo = copy.copy(tarinfo) tarinfo._link_target = os.path.join(path, tarinfo.linkname) + return tarinfo + + def _extract_one(self, tarinfo, path, set_attrs, numeric_owner): + """Extract from filtered tarinfo to disk""" + self._check("r") try: self._extract_member(tarinfo, os.path.join(path, tarinfo.name), set_attrs=set_attrs, numeric_owner=numeric_owner) except OSError as e: - if self.errorlevel > 0: - raise - else: - if e.filename is None: - self._dbg(1, "tarfile: %s" % e.strerror) - else: - self._dbg(1, "tarfile: %s %r" % (e.strerror, e.filename)) + self._handle_fatal_error(e) except ExtractError as e: - if self.errorlevel > 1: - raise + self._handle_nonfatal_error(e) + + def _handle_nonfatal_error(self, e): + """Handle non-fatal error (ExtractError) according to errorlevel""" + if self.errorlevel > 1: + raise + else: + self._dbg(1, "tarfile: %s" % e) + + def _handle_fatal_error(self, e): + """Handle "fatal" error according to self.errorlevel""" + if self.errorlevel > 0: + raise + elif isinstance(e, OSError): + if e.filename is None: + self._dbg(1, "tarfile: %s" % e.strerror) else: - self._dbg(1, "tarfile: %s" % e) + self._dbg(1, "tarfile: %s %r" % (e.strerror, e.filename)) + else: + self._dbg(1, "tarfile: %s %s" % (type(e).__name__, e)) def extractfile(self, member): """Extract a member from the archive as a file object. `member' may be @@ -2150,9 +2388,13 @@ def makedir(self, tarinfo, targetpath): """Make a directory called targetpath. """ try: - # Use a safe mode for the directory, the real mode is set - # later in _extract_member(). - os.mkdir(targetpath, 0o700) + if tarinfo.mode is None: + # Use the system's default mode + os.mkdir(targetpath) + else: + # Use a safe mode for the directory, the real mode is set + # later in _extract_member(). + os.mkdir(targetpath, 0o700) except FileExistsError: pass @@ -2195,6 +2437,9 @@ def makedev(self, tarinfo, targetpath): raise ExtractError("special devices not supported by system") mode = tarinfo.mode + if mode is None: + # Use mknod's default + mode = 0o600 if tarinfo.isblk(): mode |= stat.S_IFBLK else: @@ -2213,7 +2458,6 @@ def makelink(self, tarinfo, targetpath): if tarinfo.issym(): os.symlink(tarinfo.linkname, targetpath) else: - # See extract(). if os.path.exists(tarinfo._link_target): os.link(tarinfo._link_target, targetpath) else: @@ -2238,15 +2482,19 @@ def chown(self, tarinfo, targetpath, numeric_owner): u = tarinfo.uid if not numeric_owner: try: - if grp: + if grp and tarinfo.gname: g = grp.getgrnam(tarinfo.gname)[2] except KeyError: pass try: - if pwd: + if pwd and tarinfo.uname: u = pwd.getpwnam(tarinfo.uname)[2] except KeyError: pass + if g is None: + g = -1 + if u is None: + u = -1 try: if tarinfo.issym() and hasattr(os, "lchown"): os.lchown(targetpath, u, g) @@ -2258,6 +2506,8 @@ def chown(self, tarinfo, targetpath, numeric_owner): def chmod(self, tarinfo, targetpath): """Set file permissions of targetpath according to tarinfo. """ + if tarinfo.mode is None: + return if hasattr(os, 'chmod'): try: os.chmod(targetpath, tarinfo.mode) @@ -2267,10 +2517,13 @@ def chmod(self, tarinfo, targetpath): def utime(self, tarinfo, targetpath): """Set modification time of targetpath according to tarinfo. """ + mtime = tarinfo.mtime + if mtime is None: + return if not hasattr(os, 'utime'): return try: - os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime)) + os.utime(targetpath, (mtime, mtime)) except OSError: raise ExtractError("could not change modification time") @@ -2337,13 +2590,26 @@ def _getmember(self, name, tarinfo=None, normalize=False): members = self.getmembers() # Limit the member search list up to tarinfo. + skipping = False if tarinfo is not None: - members = members[:members.index(tarinfo)] + try: + index = members.index(tarinfo) + except ValueError: + # The given starting point might be a (modified) copy. + # We'll later skip members until we find an equivalent. + skipping = True + else: + # Happy fast path + members = members[:index] if normalize: name = os.path.normpath(name) for member in reversed(members): + if skipping: + if tarinfo.offset == member.offset: + skipping = False + continue if normalize: member_name = os.path.normpath(member.name) else: @@ -2352,6 +2618,10 @@ def _getmember(self, name, tarinfo=None, normalize=False): if name == member_name: return member + if skipping: + # Starting point was not found + raise ValueError(tarinfo) + def _load(self): """Read through the entire archive file and look for readable members. @@ -2444,6 +2714,7 @@ def __exit__(self, type, value, traceback): #-------------------- # exported functions #-------------------- + def is_tarfile(name): """Return True if name points to a tar archive that we are able to handle, else return False. @@ -2465,6 +2736,10 @@ def main(): parser = argparse.ArgumentParser(description=description) parser.add_argument('-v', '--verbose', action='store_true', default=False, help='Verbose output') + parser.add_argument('--filter', metavar='', + choices=_NAMED_FILTERS, + help='Filter for extraction') + group = parser.add_mutually_exclusive_group() group.add_argument('-l', '--list', metavar='', help='Show listing of a tarfile') @@ -2476,8 +2751,12 @@ def main(): help='Create tarfile from sources') group.add_argument('-t', '--test', metavar='', help='Test if a tarfile is valid') + args = parser.parse_args() + if args.filter and not args.extract: + parser.exit(1, '--filter is only valid for extraction\n') + if args.test: src = args.test if is_tarfile(src): @@ -2508,7 +2787,7 @@ def main(): if is_tarfile(src): with TarFile.open(src, 'r:*') as tf: - tf.extractall(path=curdir) + tf.extractall(path=curdir, filter=args.filter) if args.verbose: if curdir == '.': msg = '{!r} file is extracted.'.format(src) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 9ca9ae79e68a29..da1f2ee719381e 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -1244,6 +1244,30 @@ def check_warnings(*filters, **kwargs): return _filterwarnings(filters, quiet) +@contextlib.contextmanager +def check_no_warnings(testcase, message='', category=Warning, force_gc=False): + """Context manager to check that no warnings are emitted. + + This context manager enables a given warning within its scope + and checks that no warnings are emitted even with that warning + enabled. + + If force_gc is True, a garbage collection is attempted before checking + for warnings. This may help to catch warnings emitted when objects + are deleted, such as ResourceWarning. + + Other keyword arguments are passed to warnings.filterwarnings(). + """ + with warnings.catch_warnings(record=True) as warns: + warnings.filterwarnings('always', + message=message, + category=category) + yield + if force_gc: + gc_collect() + testcase.assertEqual(warns, []) + + @contextlib.contextmanager def check_no_resource_warning(testcase): """Context manager to check that no ResourceWarning is emitted. diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 81457c8f09c972..77101251ac5d25 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -1234,12 +1234,16 @@ def test_register_archive_format(self): formats = [name for name, params in get_archive_formats()] self.assertNotIn('xxx', formats) - def check_unpack_archive(self, format): - self.check_unpack_archive_with_converter(format, lambda path: path) - self.check_unpack_archive_with_converter(format, pathlib.Path) - self.check_unpack_archive_with_converter(format, FakePath) + ### shutil.unpack_archive - def check_unpack_archive_with_converter(self, format, converter): + def check_unpack_archive(self, format, **kwargs): + self.check_unpack_archive_with_converter( + format, lambda path: path, **kwargs) + self.check_unpack_archive_with_converter( + format, pathlib.Path, **kwargs) + self.check_unpack_archive_with_converter(format, FakePath, **kwargs) + + def check_unpack_archive_with_converter(self, format, converter, **kwargs): root_dir, base_dir = self._create_files() expected = rlistdir(root_dir) expected.remove('outer') @@ -1249,35 +1253,45 @@ def check_unpack_archive_with_converter(self, format, converter): # let's try to unpack it now tmpdir2 = self.mkdtemp() - unpack_archive(filename, tmpdir2) + unpack_archive(filename, tmpdir2, **kwargs) self.assertEqual(rlistdir(tmpdir2), expected) # and again, this time with the format specified tmpdir3 = self.mkdtemp() - unpack_archive(filename, tmpdir3, format=format) + unpack_archive(filename, tmpdir3, format=format, **kwargs) self.assertEqual(rlistdir(tmpdir3), expected) - self.assertRaises(shutil.ReadError, unpack_archive, TESTFN) - self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx') + with self.assertRaises(shutil.ReadError): + unpack_archive(TESTFN, **kwargs) + with self.assertRaises(ValueError): + unpack_archive(TESTFN, format='xxx', **kwargs) + + def check_unpack_tarball(self, format): + self.check_unpack_archive(format, filter='fully_trusted') + self.check_unpack_archive(format, filter='data') + with support.check_no_warnings(self): + self.check_unpack_archive(format) def test_unpack_archive_tar(self): - self.check_unpack_archive('tar') + self.check_unpack_tarball('tar') @support.requires_zlib def test_unpack_archive_gztar(self): - self.check_unpack_archive('gztar') + self.check_unpack_tarball('gztar') @support.requires_bz2 def test_unpack_archive_bztar(self): - self.check_unpack_archive('bztar') + self.check_unpack_tarball('bztar') @support.requires_lzma def test_unpack_archive_xztar(self): - self.check_unpack_archive('xztar') + self.check_unpack_tarball('xztar') @support.requires_zlib def test_unpack_archive_zip(self): self.check_unpack_archive('zip') + with self.assertRaises(TypeError): + self.check_unpack_archive('zip', filter='data') def test_unpack_registry(self): diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 8e0b2759721bfd..f261048615ce94 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -5,6 +5,10 @@ from contextlib import contextmanager from random import Random import pathlib +import shutil +import re +import warnings +import stat import unittest import unittest.mock @@ -2191,7 +2195,12 @@ def test__all__(self): 'EmptyHeaderError', 'TruncatedHeaderError', 'EOFHeaderError', 'InvalidHeaderError', 'SubsequentHeaderError', 'ExFileObject', - 'main'} + 'main', + 'fully_trusted_filter', 'data_filter', + 'tar_filter', 'FilterError', 'AbsoluteLinkError', + 'OutsideDestinationError', 'SpecialFileError', + 'AbsolutePathError', 'LinkOutsideDestinationError', + } support.check__all__(self, tarfile, blacklist=blacklist) @@ -2214,6 +2223,15 @@ def make_simple_tarfile(self, tar_name): for tardata in files: tf.add(tardata, arcname=os.path.basename(tardata)) + def make_evil_tarfile(self, tar_name): + files = [support.findfile('tokenize_tests.txt')] + self.addCleanup(support.unlink, tar_name) + with tarfile.open(tar_name, 'w') as tf: + benign = tarfile.TarInfo('benign') + tf.addfile(benign, fileobj=io.BytesIO(b'')) + evil = tarfile.TarInfo('../evil') + tf.addfile(evil, fileobj=io.BytesIO(b'')) + def test_test_command(self): for tar_name in testtarnames: for opt in '-t', '--test': @@ -2357,6 +2375,25 @@ def test_extract_command_verbose(self): finally: support.rmtree(tarextdir) + def test_extract_command_filter(self): + self.make_evil_tarfile(tmpname) + # Make an inner directory, so the member named '../evil' + # is still extracted into `tarextdir` + destdir = os.path.join(tarextdir, 'dest') + os.mkdir(tarextdir) + try: + with support.temp_cwd(destdir): + self.tarfilecmd_failure('-e', tmpname, + '-v', + '--filter', 'data') + out = self.tarfilecmd('-e', tmpname, + '-v', + '--filter', 'fully_trusted', + PYTHONIOENCODING='utf-8') + self.assertIn(b' file is extracted.', out) + finally: + support.rmtree(tarextdir) + def test_extract_command_different_directory(self): self.make_simple_tarfile(tmpname) try: @@ -2630,6 +2667,893 @@ def test_keyword_only(self, mock_geteuid): tarfl.extract, filename_1, TEMPDIR, False, True) +class ReplaceTests(ReadTest, unittest.TestCase): + def test_replace_name(self): + member = self.tar.getmember('ustar/regtype') + replaced = member.replace(name='misc/other') + self.assertEqual(replaced.name, 'misc/other') + self.assertEqual(member.name, 'ustar/regtype') + self.assertEqual(self.tar.getmember('ustar/regtype').name, + 'ustar/regtype') + + def test_replace_deep(self): + member = self.tar.getmember('pax/regtype1') + replaced = member.replace() + replaced.pax_headers['gname'] = 'not-bar' + self.assertEqual(member.pax_headers['gname'], 'bar') + self.assertEqual( + self.tar.getmember('pax/regtype1').pax_headers['gname'], 'bar') + + def test_replace_shallow(self): + member = self.tar.getmember('pax/regtype1') + replaced = member.replace(deep=False) + replaced.pax_headers['gname'] = 'not-bar' + self.assertEqual(member.pax_headers['gname'], 'not-bar') + self.assertEqual( + self.tar.getmember('pax/regtype1').pax_headers['gname'], 'not-bar') + + def test_replace_all(self): + member = self.tar.getmember('ustar/regtype') + for attr_name in ('name', 'mtime', 'mode', 'linkname', + 'uid', 'gid', 'uname', 'gname'): + with self.subTest(attr_name=attr_name): + replaced = member.replace(**{attr_name: None}) + self.assertEqual(getattr(replaced, attr_name), None) + self.assertNotEqual(getattr(member, attr_name), None) + + def test_replace_internal(self): + member = self.tar.getmember('ustar/regtype') + with self.assertRaises(TypeError): + member.replace(offset=123456789) + + +class NoneInfoExtractTests(ReadTest): + # These mainly check that all kinds of members are extracted successfully + # if some metadata is None. + # Some of the methods do additional spot checks. + + # We also test that the default filters can deal with None. + + extraction_filter = None + + @classmethod + def setUpClass(cls): + tar = tarfile.open(tarname, mode='r', encoding="iso8859-1") + cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_ctrl" + tar.errorlevel = 0 + tar.extractall(cls.control_dir, filter=cls.extraction_filter) + tar.close() + cls.control_paths = set( + p.relative_to(cls.control_dir) + for p in pathlib.Path(cls.control_dir).glob('**/*')) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.control_dir) + + def check_files_present(self, directory): + got_paths = set( + p.relative_to(directory) + for p in pathlib.Path(directory).glob('**/*')) + self.assertEqual(self.control_paths, got_paths) + + @contextmanager + def extract_with_none(self, *attr_names): + DIR = pathlib.Path(TEMPDIR) / "extractall_none" + self.tar.errorlevel = 0 + for member in self.tar.getmembers(): + for attr_name in attr_names: + setattr(member, attr_name, None) + with support.temp_dir(DIR): + self.tar.extractall(DIR, filter='fully_trusted') + self.check_files_present(DIR) + yield DIR + + def test_extractall_none_mtime(self): + # mtimes of extracted files should be later than 'now' -- the mtime + # of a previously created directory. + now = pathlib.Path(TEMPDIR).stat().st_mtime + with self.extract_with_none('mtime') as DIR: + for path in pathlib.Path(DIR).glob('**/*'): + with self.subTest(path=path): + try: + mtime = path.stat().st_mtime + except OSError: + # Some systems can't stat symlinks, ignore those + if not path.is_symlink(): + raise + else: + self.assertGreaterEqual(path.stat().st_mtime, now) + + def test_extractall_none_mode(self): + # modes of directories and regular files should match the mode + # of a "normally" created directory or regular file + dir_mode = pathlib.Path(TEMPDIR).stat().st_mode + regular_file = pathlib.Path(TEMPDIR) / 'regular_file' + regular_file.write_text('') + regular_file_mode = regular_file.stat().st_mode + with self.extract_with_none('mode') as DIR: + for path in pathlib.Path(DIR).glob('**/*'): + with self.subTest(path=path): + if path.is_dir(): + self.assertEqual(path.stat().st_mode, dir_mode) + elif path.is_file(): + self.assertEqual(path.stat().st_mode, + regular_file_mode) + + def test_extractall_none_uid(self): + with self.extract_with_none('uid'): + pass + + def test_extractall_none_gid(self): + with self.extract_with_none('gid'): + pass + + def test_extractall_none_uname(self): + with self.extract_with_none('uname'): + pass + + def test_extractall_none_gname(self): + with self.extract_with_none('gname'): + pass + + def test_extractall_none_ownership(self): + with self.extract_with_none('uid', 'gid', 'uname', 'gname'): + pass + +class NoneInfoExtractTests_Data(NoneInfoExtractTests, unittest.TestCase): + extraction_filter = 'data' + +class NoneInfoExtractTests_FullyTrusted(NoneInfoExtractTests, + unittest.TestCase): + extraction_filter = 'fully_trusted' + +class NoneInfoExtractTests_Tar(NoneInfoExtractTests, unittest.TestCase): + extraction_filter = 'tar' + +class NoneInfoExtractTests_Default(NoneInfoExtractTests, + unittest.TestCase): + extraction_filter = None + +class NoneInfoTests_Misc(unittest.TestCase): + def test_add(self): + # When addfile() encounters None metadata, it raises a ValueError + bio = io.BytesIO() + for tarformat in (tarfile.USTAR_FORMAT, tarfile.GNU_FORMAT, + tarfile.PAX_FORMAT): + with self.subTest(tarformat=tarformat): + tar = tarfile.open(fileobj=bio, mode='w', format=tarformat) + tarinfo = tar.gettarinfo(tarname) + try: + tar.addfile(tarinfo) + except Exception: + if tarformat == tarfile.USTAR_FORMAT: + # In the old, limited format, adding might fail for + # reasons like the UID being too large + pass + else: + raise + else: + for attr_name in ('mtime', 'mode', 'uid', 'gid', + 'uname', 'gname'): + with self.subTest(attr_name=attr_name): + replaced = tarinfo.replace(**{attr_name: None}) + with self.assertRaisesRegex(ValueError, + f"{attr_name}"): + tar.addfile(replaced) + + def test_list(self): + # Change some metadata to None, then compare list() output + # word-for-word. We want list() to not raise, and to only change + # printout for the affected piece of metadata. + # (n.b.: some contents of the test archive are hardcoded.) + for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'}, + {'uname'}, {'gname'}, + {'uid', 'uname'}, {'gid', 'gname'}): + with self.subTest(attr_names=attr_names), \ + tarfile.open(tarname, encoding="iso8859-1") as tar: + tio_prev = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') + with support.swap_attr(sys, 'stdout', tio_prev): + tar.list() + for member in tar.getmembers(): + for attr_name in attr_names: + setattr(member, attr_name, None) + tio_new = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') + with support.swap_attr(sys, 'stdout', tio_new): + tar.list() + for expected, got in zip(tio_prev.detach().getvalue().split(), + tio_new.detach().getvalue().split()): + if attr_names == {'mtime'} and re.match(rb'2003-01-\d\d', expected): + self.assertEqual(got, b'????-??-??') + elif attr_names == {'mtime'} and re.match(rb'\d\d:\d\d:\d\d', expected): + self.assertEqual(got, b'??:??:??') + elif attr_names == {'mode'} and re.match( + rb'.([r-][w-][x-]){3}', expected): + self.assertEqual(got, b'??????????') + elif attr_names == {'uname'} and expected.startswith( + (b'tarfile/', b'lars/', b'foo/')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_group, exp_group) + self.assertRegex(got_user, b'[0-9]+') + elif attr_names == {'gname'} and expected.endswith( + (b'/tarfile', b'/users', b'/bar')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_user, exp_user) + self.assertRegex(got_group, b'[0-9]+') + elif attr_names == {'uid'} and expected.startswith( + (b'1000/')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_group, exp_group) + self.assertEqual(got_user, b'None') + elif attr_names == {'gid'} and expected.endswith((b'/100')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_user, exp_user) + self.assertEqual(got_group, b'None') + elif attr_names == {'uid', 'uname'} and expected.startswith( + (b'tarfile/', b'lars/', b'foo/', b'1000/')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_group, exp_group) + self.assertEqual(got_user, b'None') + elif attr_names == {'gname', 'gid'} and expected.endswith( + (b'/tarfile', b'/users', b'/bar', b'/100')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_user, exp_user) + self.assertEqual(got_group, b'None') + else: + # In other cases the output should be the same + self.assertEqual(expected, got) + +def _filemode_to_int(mode): + """Inverse of `stat.filemode` (for permission bits) + + Using mode strings rather than numbers makes the later tests more readable. + """ + str_mode = mode[1:] + result = ( + {'r': stat.S_IRUSR, '-': 0}[str_mode[0]] + | {'w': stat.S_IWUSR, '-': 0}[str_mode[1]] + | {'x': stat.S_IXUSR, '-': 0, + 's': stat.S_IXUSR | stat.S_ISUID, + 'S': stat.S_ISUID}[str_mode[2]] + | {'r': stat.S_IRGRP, '-': 0}[str_mode[3]] + | {'w': stat.S_IWGRP, '-': 0}[str_mode[4]] + | {'x': stat.S_IXGRP, '-': 0, + 's': stat.S_IXGRP | stat.S_ISGID, + 'S': stat.S_ISGID}[str_mode[5]] + | {'r': stat.S_IROTH, '-': 0}[str_mode[6]] + | {'w': stat.S_IWOTH, '-': 0}[str_mode[7]] + | {'x': stat.S_IXOTH, '-': 0, + 't': stat.S_IXOTH | stat.S_ISVTX, + 'T': stat.S_ISVTX}[str_mode[8]] + ) + # check we did this right + assert stat.filemode(result)[1:] == mode[1:] + + return result + +class ArchiveMaker: + """Helper to create a tar file with specific contents + + Usage: + + with ArchiveMaker() as t: + t.add('filename', ...) + + with t.open() as tar: + ... # `tar` is now a TarFile with 'filename' in it! + """ + def __init__(self): + self.bio = io.BytesIO() + + def __enter__(self): + self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio) + return self + + def __exit__(self, *exc): + self.tar_w.close() + self.contents = self.bio.getvalue() + self.bio = None + + def add(self, name, *, type=None, symlink_to=None, hardlink_to=None, + mode=None, **kwargs): + """Add a member to the test archive. Call within `with`.""" + name = str(name) + tarinfo = tarfile.TarInfo(name).replace(**kwargs) + if mode: + tarinfo.mode = _filemode_to_int(mode) + if symlink_to is not None: + type = tarfile.SYMTYPE + tarinfo.linkname = str(symlink_to) + if hardlink_to is not None: + type = tarfile.LNKTYPE + tarinfo.linkname = str(hardlink_to) + if name.endswith('/') and type is None: + type = tarfile.DIRTYPE + if type is not None: + tarinfo.type = type + if tarinfo.isreg(): + fileobj = io.BytesIO(bytes(tarinfo.size)) + else: + fileobj = None + self.tar_w.addfile(tarinfo, fileobj) + + def open(self, **kwargs): + """Open the resulting archive as TarFile. Call after `with`.""" + bio = io.BytesIO(self.contents) + return tarfile.open(fileobj=bio, **kwargs) + + +class TestExtractionFilters(unittest.TestCase): + + # A temporary directory for the extraction results. + # All files that "escape" the destination path should still end + # up in this directory. + outerdir = pathlib.Path(TEMPDIR) / 'outerdir' + + # The destination for the extraction, within `outerdir` + destdir = outerdir / 'dest' + + @contextmanager + def check_context(self, tar, filter): + """Extracts `tar` to `self.destdir` and allows checking the result + + If an error occurs, it must be checked using `expect_exception` + + Otherwise, all resulting files must be checked using `expect_file`, + except the destination directory itself and parent directories of + other files. + When checking directories, do so before their contents. + """ + with support.temp_dir(self.outerdir): + try: + tar.extractall(self.destdir, filter=filter) + except Exception as exc: + self.raised_exception = exc + self.expected_paths = set() + else: + self.raised_exception = None + self.expected_paths = set(self.outerdir.glob('**/*')) + self.expected_paths.discard(self.destdir) + try: + yield + finally: + tar.close() + if self.raised_exception: + raise self.raised_exception + self.assertEqual(self.expected_paths, set()) + + def expect_file(self, name, type=None, symlink_to=None, mode=None): + """Check a single file. See check_context.""" + if self.raised_exception: + raise self.raised_exception + # use normpath() rather than resolve() so we don't follow symlinks + path = pathlib.Path(os.path.normpath(self.destdir / name)) + self.assertIn(path, self.expected_paths) + self.expected_paths.remove(path) + + # When checking mode, ignore Windows (which can only set user read and + # user write bits). Newer versions of Python use `os_helper.can_chmod()` + # instead of hardcoding Windows. + if mode is not None and sys.platform != 'win32': + got = stat.filemode(stat.S_IMODE(path.stat().st_mode)) + self.assertEqual(got, mode) + + if type is None and isinstance(name, str) and name.endswith('/'): + type = tarfile.DIRTYPE + if symlink_to is not None: + got = pathlib.Path(os.readlink(str(self.destdir / name))) + expected = pathlib.Path(symlink_to) + # The symlink might be the same (textually) as what we expect, + # but some systems change the link to an equivalent path, so + # we fall back to samefile(). + if expected != got: + self.assertTrue(got.samefile(expected)) + elif type == tarfile.REGTYPE or type is None: + self.assertTrue(path.is_file()) + elif type == tarfile.DIRTYPE: + self.assertTrue(path.is_dir()) + elif type == tarfile.FIFOTYPE: + self.assertTrue(path.is_fifo()) + else: + raise NotImplementedError(type) + for parent in path.parents: + self.expected_paths.discard(parent) + + def expect_exception(self, exc_type, message_re='.'): + with self.assertRaisesRegex(exc_type, message_re): + if self.raised_exception is not None: + raise self.raised_exception + self.raised_exception = None + + def test_benign_file(self): + with ArchiveMaker() as arc: + arc.add('benign.txt') + for filter in 'fully_trusted', 'tar', 'data': + with self.check_context(arc.open(), filter): + self.expect_file('benign.txt') + + def test_absolute(self): + # Test handling a member with an absolute path + # Inspired by 'absolute1' in https://github.com/jwilk/traversal-archives + with ArchiveMaker() as arc: + arc.add(self.outerdir / 'escaped.evil') + + with self.check_context(arc.open(), 'fully_trusted'): + self.expect_file('../escaped.evil') + + for filter in 'tar', 'data': + with self.check_context(arc.open(), filter): + if str(self.outerdir).startswith('/'): + # We strip leading slashes, as e.g. GNU tar does + # (without --absolute-filenames). + outerdir_stripped = str(self.outerdir).lstrip('/') + self.expect_file(f'{outerdir_stripped}/escaped.evil') + else: + # On this system, absolute paths don't have leading + # slashes. + # So, there's nothing to strip. We refuse to unpack + # to an absolute path, nonetheless. + self.expect_exception( + tarfile.AbsolutePathError, + """['"].*escaped.evil['"] has an absolute path""") + + def test_parent_symlink(self): + # Test interplaying symlinks + # Inspired by 'dirsymlink2a' in jwilk/traversal-archives + with ArchiveMaker() as arc: + arc.add('current', symlink_to='.') + arc.add('parent', symlink_to='current/..') + arc.add('parent/evil') + + if support.can_symlink(): + with self.check_context(arc.open(), 'fully_trusted'): + if self.raised_exception is not None: + # Windows will refuse to create a file that's a symlink to itself + # (and tarfile doesn't swallow that exception) + self.expect_exception(FileExistsError) + # The other cases will fail with this error too. + # Skip the rest of this test. + return + else: + self.expect_file('current', symlink_to='.') + self.expect_file('parent', symlink_to='current/..') + self.expect_file('../evil') + + with self.check_context(arc.open(), 'tar'): + self.expect_exception( + tarfile.OutsideDestinationError, + """'parent/evil' would be extracted to ['"].*evil['"], """ + + "which is outside the destination") + + with self.check_context(arc.open(), 'data'): + self.expect_exception( + tarfile.LinkOutsideDestinationError, + """'parent' would link to ['"].*outerdir['"], """ + + "which is outside the destination") + + else: + # No symlink support. The symlinks are ignored. + with self.check_context(arc.open(), 'fully_trusted'): + self.expect_file('parent/evil') + with self.check_context(arc.open(), 'tar'): + self.expect_file('parent/evil') + with self.check_context(arc.open(), 'data'): + self.expect_file('parent/evil') + + def test_parent_symlink2(self): + # Test interplaying symlinks + # Inspired by 'dirsymlink2b' in jwilk/traversal-archives + with ArchiveMaker() as arc: + arc.add('current', symlink_to='.') + arc.add('current/parent', symlink_to='..') + arc.add('parent/evil') + + with self.check_context(arc.open(), 'fully_trusted'): + if support.can_symlink(): + self.expect_file('current', symlink_to='.') + self.expect_file('parent', symlink_to='..') + self.expect_file('../evil') + else: + self.expect_file('current/') + self.expect_file('parent/evil') + + with self.check_context(arc.open(), 'tar'): + if support.can_symlink(): + self.expect_exception( + tarfile.OutsideDestinationError, + "'parent/evil' would be extracted to " + + """['"].*evil['"], which is outside """ + + "the destination") + else: + self.expect_file('current/') + self.expect_file('parent/evil') + + with self.check_context(arc.open(), 'data'): + self.expect_exception( + tarfile.LinkOutsideDestinationError, + """'current/parent' would link to ['"].*['"], """ + + "which is outside the destination") + + def test_absolute_symlink(self): + # Test symlink to an absolute path + # Inspired by 'dirsymlink' in jwilk/traversal-archives + with ArchiveMaker() as arc: + arc.add('parent', symlink_to=self.outerdir) + arc.add('parent/evil') + + with self.check_context(arc.open(), 'fully_trusted'): + if support.can_symlink(): + self.expect_file('parent', symlink_to=self.outerdir) + self.expect_file('../evil') + else: + self.expect_file('parent/evil') + + with self.check_context(arc.open(), 'tar'): + if support.can_symlink(): + self.expect_exception( + tarfile.OutsideDestinationError, + "'parent/evil' would be extracted to " + + """['"].*evil['"], which is outside """ + + "the destination") + else: + self.expect_file('parent/evil') + + with self.check_context(arc.open(), 'data'): + self.expect_exception( + tarfile.AbsoluteLinkError, + "'parent' is a symlink to an absolute path") + + def test_sly_relative0(self): + # Inspired by 'relative0' in jwilk/traversal-archives + with ArchiveMaker() as arc: + arc.add('../moo', symlink_to='..//tmp/moo') + + try: + with self.check_context(arc.open(), filter='fully_trusted'): + if support.can_symlink(): + if isinstance(self.raised_exception, FileExistsError): + # XXX TarFile happens to fail creating a parent + # directory. + # This might be a bug, but fixing it would hurt + # security. + # Note that e.g. GNU `tar` rejects '..' components, + # so you could argue this is an invalid archive and we + # just raise an bad type of exception. + self.expect_exception(FileExistsError) + else: + self.expect_file('../moo', symlink_to='..//tmp/moo') + else: + # The symlink can't be extracted and is ignored + pass + except FileExistsError: + pass + + for filter in 'tar', 'data': + with self.check_context(arc.open(), filter): + self.expect_exception( + tarfile.OutsideDestinationError, + "'../moo' would be extracted to " + + "'.*moo', which is outside " + + "the destination") + + def test_sly_relative2(self): + # Inspired by 'relative2' in jwilk/traversal-archives + with ArchiveMaker() as arc: + arc.add('tmp/') + arc.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo') + + with self.check_context(arc.open(), 'fully_trusted'): + self.expect_file('tmp', type=tarfile.DIRTYPE) + if support.can_symlink(): + self.expect_file('../moo', symlink_to='tmp/../../tmp/moo') + + for filter in 'tar', 'data': + with self.check_context(arc.open(), filter): + self.expect_exception( + tarfile.OutsideDestinationError, + "'tmp/../../moo' would be extracted to " + + """['"].*moo['"], which is outside the """ + + "destination") + + def test_modes(self): + # Test how file modes are extracted + # (Note that the modes are ignored on platforms without working chmod) + with ArchiveMaker() as arc: + arc.add('all_bits', mode='?rwsrwsrwt') + arc.add('perm_bits', mode='?rwxrwxrwx') + arc.add('exec_group_other', mode='?rw-rwxrwx') + arc.add('read_group_only', mode='?---r-----') + arc.add('no_bits', mode='?---------') + arc.add('dir/', mode='?---rwsrwt') + + # On some systems, setting the sticky bit is a no-op. + # Check if that's the case. + tmp_filename = os.path.join(TEMPDIR, "tmp.file") + with open(tmp_filename, 'w'): + pass + os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX) + have_sticky_files = (os.stat(tmp_filename).st_mode & stat.S_ISVTX) + os.unlink(tmp_filename) + + os.mkdir(tmp_filename) + os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX) + have_sticky_dirs = (os.stat(tmp_filename).st_mode & stat.S_ISVTX) + os.rmdir(tmp_filename) + + with self.check_context(arc.open(), 'fully_trusted'): + if have_sticky_files: + self.expect_file('all_bits', mode='?rwsrwsrwt') + else: + self.expect_file('all_bits', mode='?rwsrwsrwx') + self.expect_file('perm_bits', mode='?rwxrwxrwx') + self.expect_file('exec_group_other', mode='?rw-rwxrwx') + self.expect_file('read_group_only', mode='?---r-----') + self.expect_file('no_bits', mode='?---------') + if have_sticky_dirs: + self.expect_file('dir/', mode='?---rwsrwt') + else: + self.expect_file('dir/', mode='?---rwsrwx') + + with self.check_context(arc.open(), 'tar'): + self.expect_file('all_bits', mode='?rwxr-xr-x') + self.expect_file('perm_bits', mode='?rwxr-xr-x') + self.expect_file('exec_group_other', mode='?rw-r-xr-x') + self.expect_file('read_group_only', mode='?---r-----') + self.expect_file('no_bits', mode='?---------') + self.expect_file('dir/', mode='?---r-xr-x') + + with self.check_context(arc.open(), 'data'): + normal_dir_mode = stat.filemode(stat.S_IMODE( + self.outerdir.stat().st_mode)) + self.expect_file('all_bits', mode='?rwxr-xr-x') + self.expect_file('perm_bits', mode='?rwxr-xr-x') + self.expect_file('exec_group_other', mode='?rw-r--r--') + self.expect_file('read_group_only', mode='?rw-r-----') + self.expect_file('no_bits', mode='?rw-------') + self.expect_file('dir/', mode=normal_dir_mode) + + def test_pipe(self): + # Test handling of a special file + with ArchiveMaker() as arc: + arc.add('foo', type=tarfile.FIFOTYPE) + + for filter in 'fully_trusted', 'tar': + with self.check_context(arc.open(), filter): + if hasattr(os, 'mkfifo'): + self.expect_file('foo', type=tarfile.FIFOTYPE) + else: + # The pipe can't be extracted and is skipped. + pass + + with self.check_context(arc.open(), 'data'): + self.expect_exception( + tarfile.SpecialFileError, + "'foo' is a special file") + + def test_special_files(self): + # Creating device files is tricky. Instead of attempting that let's + # only check the filter result. + for special_type in tarfile.FIFOTYPE, tarfile.CHRTYPE, tarfile.BLKTYPE: + tarinfo = tarfile.TarInfo('foo') + tarinfo.type = special_type + trusted = tarfile.fully_trusted_filter(tarinfo, '') + self.assertIs(trusted, tarinfo) + tar = tarfile.tar_filter(tarinfo, '') + self.assertEqual(tar.type, special_type) + with self.assertRaises(tarfile.SpecialFileError) as cm: + tarfile.data_filter(tarinfo, '') + self.assertIsInstance(cm.exception.tarinfo, tarfile.TarInfo) + self.assertEqual(cm.exception.tarinfo.name, 'foo') + + def test_fully_trusted_filter(self): + # The 'fully_trusted' filter returns the original TarInfo objects. + with tarfile.TarFile.open(tarname) as tar: + for tarinfo in tar.getmembers(): + filtered = tarfile.fully_trusted_filter(tarinfo, '') + self.assertIs(filtered, tarinfo) + + def test_tar_filter(self): + # The 'tar' filter returns TarInfo objects with the same name/type. + # (It can also fail for particularly "evil" input, but we don't have + # that in the test archive.) + with tarfile.TarFile.open(tarname) as tar: + for tarinfo in tar.getmembers(): + filtered = tarfile.tar_filter(tarinfo, '') + self.assertIs(filtered.name, tarinfo.name) + self.assertIs(filtered.type, tarinfo.type) + + def test_data_filter(self): + # The 'data' filter either raises, or returns TarInfo with the same + # name/type. + with tarfile.TarFile.open(tarname) as tar: + for tarinfo in tar.getmembers(): + try: + filtered = tarfile.data_filter(tarinfo, '') + except tarfile.FilterError: + continue + self.assertIs(filtered.name, tarinfo.name) + self.assertIs(filtered.type, tarinfo.type) + + def test_default_filter_warns_not(self): + """Ensure the default filter does not warn (like in 3.12)""" + with ArchiveMaker() as arc: + arc.add('foo') + with support.check_no_warnings(self): + with self.check_context(arc.open(), None): + self.expect_file('foo') + + def test_change_default_filter_on_instance(self): + tar = tarfile.TarFile(tarname, 'r') + def strict_filter(tarinfo, path): + if tarinfo.name == 'ustar/regtype': + return tarinfo + else: + return None + tar.extraction_filter = strict_filter + with self.check_context(tar, None): + self.expect_file('ustar/regtype') + + def test_change_default_filter_on_class(self): + def strict_filter(tarinfo, path): + if tarinfo.name == 'ustar/regtype': + return tarinfo + else: + return None + tar = tarfile.TarFile(tarname, 'r') + with support.swap_attr(tarfile.TarFile, 'extraction_filter', + staticmethod(strict_filter)): + with self.check_context(tar, None): + self.expect_file('ustar/regtype') + + def test_change_default_filter_on_subclass(self): + class TarSubclass(tarfile.TarFile): + def extraction_filter(self, tarinfo, path): + if tarinfo.name == 'ustar/regtype': + return tarinfo + else: + return None + + tar = TarSubclass(tarname, 'r') + with self.check_context(tar, None): + self.expect_file('ustar/regtype') + + def test_change_default_filter_to_string(self): + tar = tarfile.TarFile(tarname, 'r') + tar.extraction_filter = 'data' + with self.check_context(tar, None): + self.expect_exception(TypeError) + + def test_custom_filter(self): + def custom_filter(tarinfo, path): + self.assertIs(path, self.destdir) + if tarinfo.name == 'move_this': + return tarinfo.replace(name='moved') + if tarinfo.name == 'ignore_this': + return None + return tarinfo + + with ArchiveMaker() as arc: + arc.add('move_this') + arc.add('ignore_this') + arc.add('keep') + with self.check_context(arc.open(), custom_filter): + self.expect_file('moved') + self.expect_file('keep') + + def test_bad_filter_name(self): + with ArchiveMaker() as arc: + arc.add('foo') + with self.check_context(arc.open(), 'bad filter name'): + self.expect_exception(ValueError) + + def test_stateful_filter(self): + # Stateful filters should be possible. + # (This doesn't really test tarfile. Rather, it demonstrates + # that third parties can implement a stateful filter.) + class StatefulFilter: + def __enter__(self): + self.num_files_processed = 0 + return self + + def __call__(self, tarinfo, path): + try: + tarinfo = tarfile.data_filter(tarinfo, path) + except tarfile.FilterError: + return None + self.num_files_processed += 1 + return tarinfo + + def __exit__(self, *exc_info): + self.done = True + + with ArchiveMaker() as arc: + arc.add('good') + arc.add('bad', symlink_to='/') + arc.add('good') + with StatefulFilter() as custom_filter: + with self.check_context(arc.open(), custom_filter): + self.expect_file('good') + self.assertEqual(custom_filter.num_files_processed, 2) + self.assertEqual(custom_filter.done, True) + + def test_errorlevel(self): + def extracterror_filter(tarinfo, path): + raise tarfile.ExtractError('failed with ExtractError') + def filtererror_filter(tarinfo, path): + raise tarfile.FilterError('failed with FilterError') + def oserror_filter(tarinfo, path): + raise OSError('failed with OSError') + def tarerror_filter(tarinfo, path): + raise tarfile.TarError('failed with base TarError') + def valueerror_filter(tarinfo, path): + raise ValueError('failed with ValueError') + + with ArchiveMaker() as arc: + arc.add('file') + + # If errorlevel is 0, errors affected by errorlevel are ignored + + with self.check_context(arc.open(errorlevel=0), extracterror_filter): + self.expect_file('file') + + with self.check_context(arc.open(errorlevel=0), filtererror_filter): + self.expect_file('file') + + with self.check_context(arc.open(errorlevel=0), oserror_filter): + self.expect_file('file') + + with self.check_context(arc.open(errorlevel=0), tarerror_filter): + self.expect_exception(tarfile.TarError) + + with self.check_context(arc.open(errorlevel=0), valueerror_filter): + self.expect_exception(ValueError) + + # If 1, all fatal errors are raised + + with self.check_context(arc.open(errorlevel=1), extracterror_filter): + self.expect_file('file') + + with self.check_context(arc.open(errorlevel=1), filtererror_filter): + self.expect_exception(tarfile.FilterError) + + with self.check_context(arc.open(errorlevel=1), oserror_filter): + self.expect_exception(OSError) + + with self.check_context(arc.open(errorlevel=1), tarerror_filter): + self.expect_exception(tarfile.TarError) + + with self.check_context(arc.open(errorlevel=1), valueerror_filter): + self.expect_exception(ValueError) + + # If 2, all non-fatal errors are raised as well. + + with self.check_context(arc.open(errorlevel=2), extracterror_filter): + self.expect_exception(tarfile.ExtractError) + + with self.check_context(arc.open(errorlevel=2), filtererror_filter): + self.expect_exception(tarfile.FilterError) + + with self.check_context(arc.open(errorlevel=2), oserror_filter): + self.expect_exception(OSError) + + with self.check_context(arc.open(errorlevel=2), tarerror_filter): + self.expect_exception(tarfile.TarError) + + with self.check_context(arc.open(errorlevel=2), valueerror_filter): + self.expect_exception(ValueError) + + # We only handle ExtractionError, FilterError & OSError specially. + + with self.check_context(arc.open(errorlevel='boo!'), filtererror_filter): + self.expect_exception(TypeError) # errorlevel is not int + + def setUpModule(): support.unlink(TEMPDIR) os.makedirs(TEMPDIR) diff --git a/Misc/NEWS.d/next/Library/2023-03-23-15-24-38.gh-issue-102953.YR4KaK.rst b/Misc/NEWS.d/next/Library/2023-03-23-15-24-38.gh-issue-102953.YR4KaK.rst new file mode 100644 index 00000000000000..48a105a4a17b29 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-03-23-15-24-38.gh-issue-102953.YR4KaK.rst @@ -0,0 +1,4 @@ +The extraction methods in :mod:`tarfile`, and :func:`shutil.unpack_archive`, +have a new a *filter* argument that allows limiting tar features than may be +surprising or dangerous, such as creating files outside the destination +directory. See :ref:`tarfile-extraction-filter` for details.