diff --git a/pfio/v2/zip.py b/pfio/v2/zip.py index 638227bb..02d242a8 100644 --- a/pfio/v2/zip.py +++ b/pfio/v2/zip.py @@ -5,12 +5,36 @@ from datetime import datetime from typing import Optional, Set +from ._profiler import record, record_iterable from .fs import FS, FileStat, format_repr logger = logging.getLogger(__name__) logger.addHandler(logging.StreamHandler()) +class ZipProfileIOWrapper: + def __init__(self, fp): + self.fp = fp + + def __enter__(self): + self.fp.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + with record("pfio.v2.Zip:exit-context", trace=True): + self.fp.__exit__(exc_type, exc_value, traceback) + + def __getattr__(self, name): + attr = getattr(self.fp, name) + if callable(attr): + def wrapper(*args, **kwargs): + with record(f"pfio.v2.Zip:{attr.__name__}", trace=True): + return attr(*args, **kwargs) + return wrapper + else: + return attr + + class ZipFileStat(FileStat): """Detailed information of a file in a Zip @@ -51,13 +75,15 @@ def __init__(self, zip_info): class Zip(FS): _readonly = True - def __init__(self, backend, file_path, mode='r', create=False, - local_cache=False, local_cachedir=None, **kwargs): + def __init__(self, backend, file_path, mode='r', + create=False, local_cache=False, local_cachedir=None, + trace=False, **kwargs): super().__init__() self.backend = backend self.file_path = file_path self.mode = mode self.kwargs = kwargs + self.trace = trace if create: raise ValueError("create option is not supported") @@ -74,13 +100,15 @@ def __init__(self, backend, file_path, mode='r', create=False, self._reset() def _reset(self): - obj = self.backend.open(self.file_path, - self.mode + 'b', - **self.kwargs) - self.fileobj = obj + with record("pfio.v2.Zip:create-zipfile-obj", trace=self.trace): + obj = self.backend.open(self.file_path, + self.mode + 'b', + **self.kwargs) + self.fileobj = obj + + assert self.fileobj is not None + self.zipobj = zipfile.ZipFile(self.fileobj, self.mode) - assert self.fileobj is not None - self.zipobj = zipfile.ZipFile(self.fileobj, self.mode) self.name_cache: Optional[Set[str]] = None if self._readonly: self.name_cache = self._names() @@ -108,43 +136,59 @@ def __repr__(self): def open(self, file_path, mode='r', buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None): - self._checkfork() + with record("pfio.v2.Zip:open", trace=self.trace): + self._checkfork() - file_path = os.path.join(self.cwd, os.path.normpath(file_path)) - fp = self.zipobj.open(file_path, mode.replace('b', '')) + file_path = os.path.join(self.cwd, os.path.normpath(file_path)) + fp = self.zipobj.open(file_path, mode.replace('b', '')) - if 'b' not in mode: - fp = io.TextIOWrapper(fp, encoding, errors, newline) + if 'b' not in mode: + fp = io.TextIOWrapper(fp, encoding, errors, newline) - return fp + if self.trace: + return ZipProfileIOWrapper(fp) + else: + return fp def subfs(self, path): # TODO raise NotImplementedError() def close(self): - self._checkfork() - self.zipobj.close() - self.fileobj.close() + with record("pfio.v2.Zip:close", trace=self.trace): + self._checkfork() + self.zipobj.close() + self.fileobj.close() def stat(self, path): - self._checkfork() - names = self._names() - path = os.path.join(self.cwd, os.path.normpath(path)) - if path in names: - actual_path = path - elif not path.endswith('/') and path + '/' in names: - # handles cases when path is a directory but without trailing slash - # see issue $67 - actual_path = path + '/' - else: - raise FileNotFoundError( - "{} is not found".format(path)) + with record("pfio.v2.Zip:stat", trace=self.trace): + self._checkfork() + names = self._names() + path = os.path.join(self.cwd, os.path.normpath(path)) + if path in names: + actual_path = path + elif not path.endswith('/') and path + '/' in names: + # handles cases when path is a directory + # but without trailing slash + # see issue $67 + actual_path = path + '/' + else: + raise FileNotFoundError( + "{} is not found".format(path)) - return ZipFileStat(self.zipobj.getinfo(actual_path)) + return ZipFileStat(self.zipobj.getinfo(actual_path)) def list(self, path_or_prefix: Optional[str] = "", recursive=False, detail=False): + for e in record_iterable("pfio.v2.Zip:list", + self._list(path_or_prefix, + recursive, + detail), + trace=self.trace): + yield e + + def _list(self, path_or_prefix: Optional[str] = "", recursive=False, + detail=False): self._checkfork() if path_or_prefix: @@ -207,18 +251,19 @@ def list(self, path_or_prefix: Optional[str] = "", recursive=False, yield return_file_name def isdir(self, file_path: str): - self._checkfork() - file_path = os.path.join(self.cwd, file_path) - if self.exists(file_path): - return self.stat(file_path).isdir() - else: - file_path = os.path.normpath(file_path) - # check if directories are NOT included in the zip - if any(name.startswith(file_path + "/") - for name in self._names()): - return True + with record("pfio.v2.Zip:isdir", trace=self.trace): + self._checkfork() + file_path = os.path.join(self.cwd, file_path) + if self.exists(file_path): + return self.stat(file_path).isdir() + else: + file_path = os.path.normpath(file_path) + # check if directories are NOT included in the zip + if any(name.startswith(file_path + "/") + for name in self._names()): + return True - return False + return False def mkdir(self, file_path: str, mode=0o777, *args, dir_fd=None): raise io.UnsupportedOperation("zip does not support mkdir") @@ -227,11 +272,12 @@ def makedirs(self, file_path: str, mode=0o777, exist_ok=False): raise io.UnsupportedOperation("zip does not support makedirs") def exists(self, file_path: str): - self._checkfork() - file_path = os.path.join(self.cwd, os.path.normpath(file_path)) - namelist = self.zipobj.namelist() - return (file_path in namelist - or file_path + "/" in namelist) + with record("pfio.v2.Zip:exists", trace=self.trace): + self._checkfork() + file_path = os.path.join(self.cwd, os.path.normpath(file_path)) + namelist = self.zipobj.namelist() + return (file_path in namelist + or file_path + "/" in namelist) def rename(self, *args): raise io.UnsupportedOperation diff --git a/tests/v2_tests/test_s3_zip.py b/tests/v2_tests/test_s3_zip.py index c7857aa8..89f109e3 100644 --- a/tests/v2_tests/test_s3_zip.py +++ b/tests/v2_tests/test_s3_zip.py @@ -1,4 +1,5 @@ import io +import json import multiprocessing import os import shutil @@ -186,3 +187,39 @@ def test_force_type2(): k = "dir/f" with s3z.open(k, 'wb') as fp: fp.write(b"bar") + + +@mock_aws +def test_s3_zip_profiling(): + ppe = pytest.importorskip("pytorch_pfn_extras") + + with tempfile.TemporaryDirectory() as tmpdir: + zipfilename = os.path.join(tmpdir, "test.zip") + zft = ZipForTest(zipfilename) + bucket = "test-dummy-bucket" + + with from_url('s3://{}/'.format(bucket), + create_bucket=True) as s3: + with open(zipfilename, 'rb') as src, \ + s3.open('test.zip', 'wb') as dst: + shutil.copyfileobj(src, dst) + + ppe.profiler.clear_tracer() + with from_url('s3://{}/test.zip'.format(bucket), + trace=True) as fs: + with fs.open('file', 'rb') as fp: + assert zft.content('file') == fp.read() + + state = ppe.profiler.get_tracer().state_dict() + keys = [event["name"] for event in json.loads(state['_event_list'])] + + assert "pfio.v2.Zip:create-zipfile-obj" in keys + assert "pfio.v2.Zip:open" in keys + assert "pfio.v2.Zip:read" in keys + assert "pfio.v2.Zip:close" in keys + + assert "pfio.v2.S3:open" in keys + assert "pfio.v2.S3:read" in keys + assert "pfio.v2.S3:close" in keys + + assert "pfio.boto3:get_object" in keys diff --git a/tests/v2_tests/test_zip.py b/tests/v2_tests/test_zip.py index adbb55f8..41df83ef 100644 --- a/tests/v2_tests/test_zip.py +++ b/tests/v2_tests/test_zip.py @@ -1,4 +1,5 @@ import io +import json import multiprocessing import os import pickle @@ -890,3 +891,31 @@ def test_is_zipfile(): with local.open_zip(zipfilename) as zfs: for o in zfs.list(recursive=True, detail=True): assert isinstance(o, ZipFileStat) + + +def test_zip_profiling(): + ppe = pytest.importorskip("pytorch_pfn_extras") + + with tempfile.TemporaryDirectory() as tmpdir: + zipfilename = os.path.join(tmpdir, 'test.zip') + _ = ZipForTest(zipfilename) + + ppe.profiler.clear_tracer() + with from_url(zipfilename, trace=True) as fs: + for name in fs.list(recursive=True): + with fs.open(name, mode='r') as fp: + _ = fp.read() + + state = ppe.profiler.get_tracer().state_dict() + keys = [event["name"] for event in json.loads(state['_event_list'])] + + assert "pfio.v2.Zip:create-zipfile-obj" in keys + assert "pfio.v2.Zip:list-0" in keys + assert "pfio.v2.Zip:list-1" in keys + assert "pfio.v2.Zip:open" in keys + assert "pfio.v2.Zip:read" in keys + assert "pfio.v2.Zip:close" in keys + + assert "pfio.v2.Local:open" in keys + assert "pfio.v2.Local:read" in keys + assert "pfio.v2.Local:close" in keys