Skip to content

Commit

Permalink
gh-101566: Sync with zipp 3.14. (GH-102018)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaraco authored Feb 20, 2023
1 parent 84181c1 commit 36854bb
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 56 deletions.
30 changes: 30 additions & 0 deletions Lib/test/test_zipfile/_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import contextlib
import time


class DeadlineExceeded(Exception):
pass


class TimedContext(contextlib.ContextDecorator):
"""
A context that will raise DeadlineExceeded if the
max duration is reached during the execution.
>>> TimedContext(1)(time.sleep)(.1)
>>> TimedContext(0)(time.sleep)(.1)
Traceback (most recent call last):
...
tests._context.DeadlineExceeded: (..., 0)
"""

def __init__(self, max_duration: int):
self.max_duration = max_duration

def __enter__(self):
self.start = time.monotonic()

def __exit__(self, *err):
duration = time.monotonic() - self.start
if duration > self.max_duration:
raise DeadlineExceeded(duration, self.max_duration)
8 changes: 8 additions & 0 deletions Lib/test/test_zipfile/_func_timeout_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
try:
from func_timeout import func_set_timeout as set_timeout
except ImportError: # pragma: no cover
# provide a fallback that doesn't actually time out
from ._context import TimedContext as set_timeout


__all__ = ['set_timeout']
29 changes: 29 additions & 0 deletions Lib/test/test_zipfile/_itertools.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
import itertools


# from jaraco.itertools 6.3.0
class Counter:
"""
Wrap an iterable in an object that stores the count of items
that pass through it.
>>> items = Counter(range(20))
>>> items.count
0
>>> values = list(items)
>>> items.count
20
"""

def __init__(self, i):
self.count = 0
self.iter = zip(itertools.count(1), i)

def __iter__(self):
return self

def __next__(self):
self.count, result = next(self.iter)
return result


# from more_itertools v8.13.0
def always_iterable(obj, base_type=(str, bytes)):
if obj is None:
Expand Down
137 changes: 84 additions & 53 deletions Lib/test/test_zipfile/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,25 @@
import pathlib
import pickle
import string
from test.support.script_helper import assert_python_ok
import sys
import unittest
import zipfile

from ._test_params import parameterize, Invoked
from ._functools import compose
from ._itertools import Counter

from ._test_params import parameterize, Invoked
from ._func_timeout_compat import set_timeout

from test.support.os_helper import temp_dir


# Poor man's technique to consume a (smallish) iterable.
consume = tuple


# from jaraco.itertools 5.0
class jaraco:
class itertools:
class Counter:
def __init__(self, i):
self.count = 0
self._orig_iter = iter(i)
Counter = Counter

def __iter__(self):
return self

def __next__(self):
result = next(self._orig_iter)
self.count += 1
return result
consume = tuple


def add_dirs(zf):
Expand Down Expand Up @@ -161,10 +150,10 @@ def test_open_encoding_utf16(self):
u16 = path.joinpath("16.txt")
with u16.open('r', "utf-16") as strm:
data = strm.read()
self.assertEqual(data, "This was utf-16")
assert data == "This was utf-16"
with u16.open(encoding="utf-16") as strm:
data = strm.read()
self.assertEqual(data, "This was utf-16")
assert data == "This was utf-16"

def test_open_encoding_errors(self):
in_memory_file = io.BytesIO()
Expand All @@ -177,9 +166,9 @@ def test_open_encoding_errors(self):

# encoding= as a positional argument for gh-101144.
data = u16.read_text("utf-8", errors="ignore")
self.assertEqual(data, "invalid utf-8: .")
assert data == "invalid utf-8: ."
with u16.open("r", "utf-8", errors="surrogateescape") as f:
self.assertEqual(f.read(), "invalid utf-8: \udcff\udcff.")
assert f.read() == "invalid utf-8: \udcff\udcff."

# encoding= both positional and keyword is an error; gh-101144.
with self.assertRaisesRegex(TypeError, "encoding"):
Expand All @@ -191,24 +180,21 @@ def test_open_encoding_errors(self):
with self.assertRaises(UnicodeDecodeError):
f.read()

def test_encoding_warnings(self):
@unittest.skipIf(
not getattr(sys.flags, 'warn_default_encoding', 0),
"Requires warn_default_encoding",
)
@pass_alpharep
def test_encoding_warnings(self, alpharep):
"""EncodingWarning must blame the read_text and open calls."""
code = '''\
import io, zipfile
with zipfile.ZipFile(io.BytesIO(), "w") as zf:
zf.filename = '<test_encoding_warnings in memory zip file>'
zf.writestr("path/file.txt", b"Spanish Inquisition")
root = zipfile.Path(zf)
(path,) = root.iterdir()
file_path = path.joinpath("file.txt")
unused = file_path.read_text() # should warn
file_path.open("r").close() # should warn
'''
proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
warnings = proc.err.splitlines()
self.assertEqual(len(warnings), 2, proc.err)
self.assertRegex(warnings[0], rb"^<string>:8: EncodingWarning:")
self.assertRegex(warnings[1], rb"^<string>:9: EncodingWarning:")
assert sys.flags.warn_default_encoding
root = zipfile.Path(alpharep)
with self.assertWarns(EncodingWarning) as wc:
root.joinpath("a.txt").read_text()
assert __file__ == wc.filename
with self.assertWarns(EncodingWarning) as wc:
root.joinpath("a.txt").open("r").close()
assert __file__ == wc.filename

def test_open_write(self):
"""
Expand Down Expand Up @@ -250,7 +236,8 @@ def test_read(self, alpharep):
root = zipfile.Path(alpharep)
a, b, g = root.iterdir()
assert a.read_text(encoding="utf-8") == "content of a"
a.read_text("utf-8") # No positional arg TypeError per gh-101144.
# Also check positional encoding arg (gh-101144).
assert a.read_text("utf-8") == "content of a"
assert a.read_bytes() == b"content of a"

@pass_alpharep
Expand All @@ -275,19 +262,6 @@ def test_traverse_truediv(self, alpharep):
e = root / "b" / "d" / "e.txt"
assert e.read_text(encoding="utf-8") == "content of e"

@pass_alpharep
def test_traverse_simplediv(self, alpharep):
"""
Disable the __future__.division when testing traversal.
"""
code = compile(
source="zipfile.Path(alpharep) / 'a'",
filename="(test)",
mode="eval",
dont_inherit=True,
)
eval(code)

@pass_alpharep
def test_pathlike_construction(self, alpharep):
"""
Expand Down Expand Up @@ -356,7 +330,7 @@ def test_joinpath_constant_time(self):
# Check the file iterated all items
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES

# @func_timeout.func_set_timeout(3)
@set_timeout(3)
def test_implied_dirs_performance(self):
data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
zipfile.CompleteDirs._implied_dirs(data)
Expand Down Expand Up @@ -472,6 +446,52 @@ def test_root_unnamed(self, alpharep):
assert sub.name == "b"
assert sub.parent

@pass_alpharep
def test_match_and_glob(self, alpharep):
root = zipfile.Path(alpharep)
assert not root.match("*.txt")

assert list(root.glob("b/c.*")) == [zipfile.Path(alpharep, "b/c.txt")]

files = root.glob("**/*.txt")
assert all(each.match("*.txt") for each in files)

assert list(root.glob("**/*.txt")) == list(root.rglob("*.txt"))

def test_glob_empty(self):
root = zipfile.Path(zipfile.ZipFile(io.BytesIO(), 'w'))
with self.assertRaises(ValueError):
root.glob('')

@pass_alpharep
def test_eq_hash(self, alpharep):
root = zipfile.Path(alpharep)
assert root == zipfile.Path(alpharep)

assert root != (root / "a.txt")
assert (root / "a.txt") == (root / "a.txt")

root = zipfile.Path(alpharep)
assert root in {root}

@pass_alpharep
def test_is_symlink(self, alpharep):
"""
See python/cpython#82102 for symlink support beyond this object.
"""

root = zipfile.Path(alpharep)
assert not root.is_symlink()

@pass_alpharep
def test_relative_to(self, alpharep):
root = zipfile.Path(alpharep)
relative = root.joinpath("b", "c.txt").relative_to(root / "b")
assert str(relative) == "c.txt"

relative = root.joinpath("b", "d", "e.txt").relative_to(root / "b")
assert str(relative) == "d/e.txt"

@pass_alpharep
def test_inheritance(self, alpharep):
cls = type('PathChild', (zipfile.Path,), {})
Expand All @@ -493,3 +513,14 @@ def test_pickle(self, alpharep, path_type, subpath):
restored_1 = pickle.loads(saved_1)
first, *rest = restored_1.iterdir()
assert first.read_text().startswith('content of ')

@pass_alpharep
def test_extract_orig_with_implied_dirs(self, alpharep):
"""
A zip file wrapped in a Path should extract even with implied dirs.
"""
source_path = self.zipfile_ondisk(alpharep)
zf = zipfile.ZipFile(source_path)
# wrap the zipfile for its side effect
zipfile.Path(zf)
zf.extractall(source_path.parent)
63 changes: 60 additions & 3 deletions Lib/zipfile/_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import itertools
import contextlib
import pathlib
import re
import fnmatch


__all__ = ['Path']
Expand Down Expand Up @@ -93,7 +95,7 @@ def _implied_dirs(names):
return _dedupe(_difference(as_dirs, names))

def namelist(self):
names = super(CompleteDirs, self).namelist()
names = super().namelist()
return names + list(self._implied_dirs(names))

def _name_set(self):
Expand All @@ -109,6 +111,17 @@ def resolve_dir(self, name):
dir_match = name not in names and dirname in names
return dirname if dir_match else name

def getinfo(self, name):
"""
Supplement getinfo for implied dirs.
"""
try:
return super().getinfo(name)
except KeyError:
if not name.endswith('/') or name not in self._name_set():
raise
return zipfile.ZipInfo(filename=name)

@classmethod
def make(cls, source):
"""
Expand Down Expand Up @@ -138,13 +151,13 @@ class FastLookup(CompleteDirs):
def namelist(self):
with contextlib.suppress(AttributeError):
return self.__names
self.__names = super(FastLookup, self).namelist()
self.__names = super().namelist()
return self.__names

def _name_set(self):
with contextlib.suppress(AttributeError):
return self.__lookup
self.__lookup = super(FastLookup, self)._name_set()
self.__lookup = super()._name_set()
return self.__lookup


Expand Down Expand Up @@ -246,6 +259,18 @@ def __init__(self, root, at=""):
self.root = FastLookup.make(root)
self.at = at

def __eq__(self, other):
"""
>>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo'
False
"""
if self.__class__ is not other.__class__:
return NotImplemented
return (self.root, self.at) == (other.root, other.at)

def __hash__(self):
return hash((self.root, self.at))

def open(self, mode='r', *args, pwd=None, **kwargs):
"""
Open this entry as text or binary following the semantics
Expand Down Expand Up @@ -316,6 +341,38 @@ def iterdir(self):
subs = map(self._next, self.root.namelist())
return filter(self._is_child, subs)

def match(self, path_pattern):
return pathlib.Path(self.at).match(path_pattern)

def is_symlink(self):
"""
Return whether this path is a symlink. Always false (python/cpython#82102).
"""
return False

def _descendants(self):
for child in self.iterdir():
yield child
if child.is_dir():
yield from child._descendants()

def glob(self, pattern):
if not pattern:
raise ValueError(f"Unacceptable pattern: {pattern!r}")

matches = re.compile(fnmatch.translate(pattern)).fullmatch
return (
child
for child in self._descendants()
if matches(str(child.relative_to(self)))
)

def rglob(self, pattern):
return self.glob(f'**/{pattern}')

def relative_to(self, other, *extra):
return posixpath.relpath(str(self), str(other.joinpath(*extra)))

def __str__(self):
return posixpath.join(self.root.filename, self.at)

Expand Down
Loading

0 comments on commit 36854bb

Please sign in to comment.