Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support PPE profiling for Zip #344

Merged
merged 4 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 92 additions & 46 deletions pfio/v2/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,36 @@
from datetime import datetime
from typing import Optional, Set

from ._profiler import record, record_iterable
from .fs import FS, FileStat, format_repr

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())


class ZipProfileIOWrapper:
def __init__(self, fp):
self.fp = fp

def __enter__(self):
self.fp.__enter__()
return self

def __exit__(self, exc_type, exc_value, traceback):
with record("pfio.v2.Zip:exit-context", trace=True):
self.fp.__exit__(exc_type, exc_value, traceback)

def __getattr__(self, name):
attr = getattr(self.fp, name)
if callable(attr):
def wrapper(*args, **kwargs):
with record(f"pfio.v2.Zip:{attr.__name__}", trace=True):
return attr(*args, **kwargs)
return wrapper
else:
return attr


class ZipFileStat(FileStat):
"""Detailed information of a file in a Zip

Expand Down Expand Up @@ -51,13 +75,15 @@ def __init__(self, zip_info):
class Zip(FS):
_readonly = True

def __init__(self, backend, file_path, mode='r', create=False,
local_cache=False, local_cachedir=None, **kwargs):
def __init__(self, backend, file_path, mode='r',
create=False, local_cache=False, local_cachedir=None,
trace=False, **kwargs):
super().__init__()
self.backend = backend
self.file_path = file_path
self.mode = mode
self.kwargs = kwargs
self.trace = trace

if create:
raise ValueError("create option is not supported")
Expand All @@ -74,13 +100,15 @@ def __init__(self, backend, file_path, mode='r', create=False,
self._reset()

def _reset(self):
obj = self.backend.open(self.file_path,
self.mode + 'b',
**self.kwargs)
self.fileobj = obj
with record("pfio.v2.Zip:create-zipfile-obj", trace=self.trace):
obj = self.backend.open(self.file_path,
self.mode + 'b',
**self.kwargs)
self.fileobj = obj

assert self.fileobj is not None
self.zipobj = zipfile.ZipFile(self.fileobj, self.mode)

assert self.fileobj is not None
self.zipobj = zipfile.ZipFile(self.fileobj, self.mode)
self.name_cache: Optional[Set[str]] = None
if self._readonly:
self.name_cache = self._names()
Expand Down Expand Up @@ -108,43 +136,59 @@ def __repr__(self):
def open(self, file_path, mode='r',
buffering=-1, encoding=None, errors=None,
newline=None, closefd=True, opener=None):
self._checkfork()
with record("pfio.v2.Zip:open", trace=self.trace):
self._checkfork()

file_path = os.path.join(self.cwd, os.path.normpath(file_path))
fp = self.zipobj.open(file_path, mode.replace('b', ''))
file_path = os.path.join(self.cwd, os.path.normpath(file_path))
fp = self.zipobj.open(file_path, mode.replace('b', ''))

if 'b' not in mode:
fp = io.TextIOWrapper(fp, encoding, errors, newline)
if 'b' not in mode:
fp = io.TextIOWrapper(fp, encoding, errors, newline)

return fp
if self.trace:
return ZipProfileIOWrapper(fp)
else:
return fp

def subfs(self, path):
# TODO
raise NotImplementedError()

def close(self):
self._checkfork()
self.zipobj.close()
self.fileobj.close()
with record("pfio.v2.Zip:close", trace=self.trace):
self._checkfork()
self.zipobj.close()
self.fileobj.close()

def stat(self, path):
self._checkfork()
names = self._names()
path = os.path.join(self.cwd, os.path.normpath(path))
if path in names:
actual_path = path
elif not path.endswith('/') and path + '/' in names:
# handles cases when path is a directory but without trailing slash
# see issue $67
actual_path = path + '/'
else:
raise FileNotFoundError(
"{} is not found".format(path))
with record("pfio.v2.Zip:stat", trace=self.trace):
self._checkfork()
names = self._names()
path = os.path.join(self.cwd, os.path.normpath(path))
if path in names:
actual_path = path
elif not path.endswith('/') and path + '/' in names:
# handles cases when path is a directory
# but without trailing slash
# see issue $67
actual_path = path + '/'
else:
raise FileNotFoundError(
"{} is not found".format(path))

return ZipFileStat(self.zipobj.getinfo(actual_path))
return ZipFileStat(self.zipobj.getinfo(actual_path))

def list(self, path_or_prefix: Optional[str] = "", recursive=False,
detail=False):
for e in record_iterable("pfio.v2.Zip:list",
self._list(path_or_prefix,
recursive,
detail),
trace=self.trace):
yield e

def _list(self, path_or_prefix: Optional[str] = "", recursive=False,
detail=False):
self._checkfork()

if path_or_prefix:
Expand Down Expand Up @@ -207,18 +251,19 @@ def list(self, path_or_prefix: Optional[str] = "", recursive=False,
yield return_file_name

def isdir(self, file_path: str):
self._checkfork()
file_path = os.path.join(self.cwd, file_path)
if self.exists(file_path):
return self.stat(file_path).isdir()
else:
file_path = os.path.normpath(file_path)
# check if directories are NOT included in the zip
if any(name.startswith(file_path + "/")
for name in self._names()):
return True
with record("pfio.v2.Zip:isdir", trace=self.trace):
self._checkfork()
file_path = os.path.join(self.cwd, file_path)
if self.exists(file_path):
return self.stat(file_path).isdir()
else:
file_path = os.path.normpath(file_path)
# check if directories are NOT included in the zip
if any(name.startswith(file_path + "/")
for name in self._names()):
return True

return False
return False

def mkdir(self, file_path: str, mode=0o777, *args, dir_fd=None):
raise io.UnsupportedOperation("zip does not support mkdir")
Expand All @@ -227,11 +272,12 @@ def makedirs(self, file_path: str, mode=0o777, exist_ok=False):
raise io.UnsupportedOperation("zip does not support makedirs")

def exists(self, file_path: str):
self._checkfork()
file_path = os.path.join(self.cwd, os.path.normpath(file_path))
namelist = self.zipobj.namelist()
return (file_path in namelist
or file_path + "/" in namelist)
with record("pfio.v2.Zip:exists", trace=self.trace):
self._checkfork()
file_path = os.path.join(self.cwd, os.path.normpath(file_path))
namelist = self.zipobj.namelist()
return (file_path in namelist
or file_path + "/" in namelist)

def rename(self, *args):
raise io.UnsupportedOperation
Expand Down
37 changes: 37 additions & 0 deletions tests/v2_tests/test_s3_zip.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
import json
import multiprocessing
import os
import shutil
Expand Down Expand Up @@ -186,3 +187,39 @@ def test_force_type2():
k = "dir/f"
with s3z.open(k, 'wb') as fp:
fp.write(b"bar")


@mock_aws
def test_s3_zip_profiling():
ppe = pytest.importorskip("pytorch_pfn_extras")

with tempfile.TemporaryDirectory() as tmpdir:
zipfilename = os.path.join(tmpdir, "test.zip")
zft = ZipForTest(zipfilename)
bucket = "test-dummy-bucket"

with from_url('s3://{}/'.format(bucket),
create_bucket=True) as s3:
with open(zipfilename, 'rb') as src, \
s3.open('test.zip', 'wb') as dst:
shutil.copyfileobj(src, dst)

ppe.profiler.clear_tracer()
with from_url('s3://{}/test.zip'.format(bucket),
trace=True) as fs:
with fs.open('file', 'rb') as fp:
assert zft.content('file') == fp.read()

state = ppe.profiler.get_tracer().state_dict()
keys = [event["name"] for event in json.loads(state['_event_list'])]

assert "pfio.v2.Zip:create-zipfile-obj" in keys
assert "pfio.v2.Zip:open" in keys
assert "pfio.v2.Zip:read" in keys
assert "pfio.v2.Zip:close" in keys

assert "pfio.v2.S3:open" in keys
assert "pfio.v2.S3:read" in keys
assert "pfio.v2.S3:close" in keys

assert "pfio.boto3:get_object" in keys
29 changes: 29 additions & 0 deletions tests/v2_tests/test_zip.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
import json
import multiprocessing
import os
import pickle
Expand Down Expand Up @@ -890,3 +891,31 @@ def test_is_zipfile():
with local.open_zip(zipfilename) as zfs:
for o in zfs.list(recursive=True, detail=True):
assert isinstance(o, ZipFileStat)


def test_zip_profiling():
ppe = pytest.importorskip("pytorch_pfn_extras")

with tempfile.TemporaryDirectory() as tmpdir:
zipfilename = os.path.join(tmpdir, 'test.zip')
_ = ZipForTest(zipfilename)

ppe.profiler.clear_tracer()
with from_url(zipfilename, trace=True) as fs:
for name in fs.list(recursive=True):
with fs.open(name, mode='r') as fp:
_ = fp.read()

state = ppe.profiler.get_tracer().state_dict()
keys = [event["name"] for event in json.loads(state['_event_list'])]

assert "pfio.v2.Zip:create-zipfile-obj" in keys
assert "pfio.v2.Zip:list-0" in keys
assert "pfio.v2.Zip:list-1" in keys
assert "pfio.v2.Zip:open" in keys
assert "pfio.v2.Zip:read" in keys
assert "pfio.v2.Zip:close" in keys

assert "pfio.v2.Local:open" in keys
assert "pfio.v2.Local:read" in keys
assert "pfio.v2.Local:close" in keys