Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DataLoader with num_workers > 0 in streaming mode #4375

Merged
merged 24 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
58289fc
make TorchIterableDataset work in parallel
lhoestq Apr 4, 2022
c26a9f1
start writing some tests
lhoestq Apr 4, 2022
4c3ce96
Merge branch 'master' into parallel-torch-iterable-dataset
lhoestq May 19, 2022
8c60fa3
fix streaming extension and fsspec issues in subprocesses
lhoestq May 19, 2022
6dc859e
fix some tests
lhoestq May 19, 2022
7056f1a
fix more tests
lhoestq May 19, 2022
edef69b
Merge branch 'master' into parallel-torch-iterable-dataset
lhoestq Jun 1, 2022
c0a0492
fix import
lhoestq Jun 1, 2022
7043816
fix and add tests
lhoestq Jun 2, 2022
a9ea955
fix patch (handle successive patches and builtins)
lhoestq Jun 2, 2022
07d4c0e
revert unnecessary change to enriched_web_blg
lhoestq Jun 2, 2022
af5de1a
style
lhoestq Jun 2, 2022
b84ae0e
use open locally to fix win permission errors
lhoestq Jun 7, 2022
1746712
keep file opened in read_csv
lhoestq Jun 7, 2022
bc837ce
Merge branch 'master' into parallel-torch-iterable-dataset
lhoestq Jun 7, 2022
fe269bf
fix compression for read_csv
lhoestq Jun 7, 2022
482c4fb
consistency of read_csv: don't infer compression for file-like objects
lhoestq Jun 7, 2022
54e9f39
stringify Path objects
lhoestq Jun 7, 2022
8f5579e
comments + raise error if sharding is ambiguous
lhoestq Jun 9, 2022
ab91dbd
Merge branch 'master' into parallel-torch-iterable-dataset
lhoestq Jun 9, 2022
1b87fb3
minor
lhoestq Jun 9, 2022
b675a69
Merge branch 'master' into parallel-torch-iterable-dataset
lhoestq Jun 9, 2022
816d591
Update src/datasets/iterable_dataset.py
lhoestq Jun 10, 2022
ff586c4
Merge branch 'master' into parallel-torch-iterable-dataset
lhoestq Jun 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/datasets/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from .iterable_dataset import ExamplesIterable, IterableDataset, _generate_examples_from_tables_wrapper
from .naming import camelcase_to_snakecase
from .splits import Split, SplitDict, SplitGenerator
from .streaming import extend_dataset_builder_for_streaming
from .utils import logging
from .utils.file_utils import cached_path, is_remote_url
from .utils.filelock import FileLock
Expand Down Expand Up @@ -337,6 +338,17 @@ def __init__(
# Record infos even if verify_infos=False; used by "datasets-cli test" to generate dataset_infos.json
self._record_infos = False

# Enable streaming (e.g. it patches "open" to work with remote files)
extend_dataset_builder_for_streaming(self)

def __getstate__(self):
return self.__dict__

def __setstate__(self, d):
self.__dict__ = d
# Re-enable streaming, since patched functions are not kept when pickling
extend_dataset_builder_for_streaming(self)

# Must be set for datasets that use 'data_dir' functionality - the ones
# that require users to do additional steps to download the data
# (this is usually due to some external regulations / rules).
Expand Down Expand Up @@ -980,14 +992,13 @@ def as_streaming_dataset(
self,
split: Optional[str] = None,
base_path: Optional[str] = None,
use_auth_token: Optional[str] = None,
) -> Union[Dict[str, IterableDataset], IterableDataset]:
if not isinstance(self, (GeneratorBasedBuilder, ArrowBasedBuilder)):
raise ValueError(f"Builder {self.name} is not streamable.")

dl_manager = StreamingDownloadManager(
base_path=base_path or self.base_path,
download_config=DownloadConfig(use_auth_token=use_auth_token),
download_config=DownloadConfig(use_auth_token=self.use_auth_token),
dataset_name=self.name,
data_dir=self.config.data_dir,
)
Expand Down
226 changes: 116 additions & 110 deletions src/datasets/download/streaming_download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,19 +326,6 @@ def _as_posix(path: Path):
return path_as_posix


def xpathjoin(a: Path, *p: Tuple[str, ...]):
"""Extend :func:`xjoin` to support argument of type :obj:`~pathlib.Path`.

Args:
a (:obj:`~pathlib.Path`): Calling Path instance.
*p (:obj:`tuple` of :obj:`str`): Other path components.

Returns:
obj:`str`
"""
return type(a)(xjoin(_as_posix(a), *p))


def _add_retries_to_file_obj_read_method(file_obj):
read = file_obj.read
max_retries = config.STREAMING_READ_MAX_RETRIES
Expand Down Expand Up @@ -434,6 +421,8 @@ def xopen(file: str, mode="r", *args, use_auth_token: Optional[Union[str, bool]]
# required for `xopen(str(Path(...)))` to work
file = _as_posix(PurePath(file))
main_hop, *rest_hops = file.split("::")
if is_local_path(main_hop):
return open(file, mode, *args, **kwargs)
# add headers and cookies for authentication on the HF Hub and for Google Drive
if not rest_hops and (main_hop.startswith("http://") or main_hop.startswith("https://")):
file, new_kwargs = _prepare_http_url_kwargs(file, use_auth_token=use_auth_token)
Expand Down Expand Up @@ -484,19 +473,6 @@ def xlistdir(path: str, use_auth_token: Optional[Union[str, bool]] = None) -> Li
return [os.path.basename(obj["name"]) for obj in objects]


def xpathopen(path: Path, *args, **kwargs):
"""Extend :func:`xopen` to support argument of type :obj:`~pathlib.Path`.

Args:
path (:obj:`~pathlib.Path`): Calling Path instance.
**kwargs: Keyword arguments passed to :func:`fsspec.open`.

Returns:
:obj:`io.FileIO`: File-like object.
"""
return xopen(_as_posix(path), *args, **kwargs)


def xglob(urlpath, *, recursive=False, use_auth_token: Optional[Union[str, bool]] = None):
"""Extend `glob.glob` function to support remote files.

Expand Down Expand Up @@ -529,126 +505,153 @@ def xglob(urlpath, *, recursive=False, use_auth_token: Optional[Union[str, bool]
return ["::".join([f"{fs.protocol}://{globbed_path}"] + rest_hops) for globbed_path in globbed_paths]


def xpathglob(path, pattern, use_auth_token: Optional[Union[str, bool]] = None):
"""Glob function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.
def xwalk(urlpath, use_auth_token: Optional[Union[str, bool]] = None):
"""Extend `os.walk` function to support remote files.

Args:
path (:obj:`~pathlib.Path`): Calling Path instance.
pattern (:obj:`str`): Pattern that resulting paths must match.
urlpath (:obj:`str`): URL root path.
use_auth_token (:obj:`bool` or :obj:`str`, optional): Whether to use token or token to authenticate on the
Hugging Face Hub for private remote files.

Yields:
:obj:`~pathlib.Path`
:obj:`tuple`: 3-tuple (dirpath, dirnames, filenames).
"""
posix_path = _as_posix(path)
main_hop, *rest_hops = posix_path.split("::")
main_hop, *rest_hops = str(urlpath).split("::")
if is_local_path(main_hop):
yield from Path(main_hop).glob(pattern)
yield from os.walk(main_hop)
else:
# globbing inside a zip in a private repo requires authentication
# walking inside a zip in a private repo requires authentication
if rest_hops and (rest_hops[0].startswith("http://") or rest_hops[0].startswith("https://")):
url = rest_hops[0]
url, kwargs = _prepare_http_url_kwargs(url, use_auth_token=use_auth_token)
storage_options = {"https": kwargs}
posix_path = "::".join([main_hop, url, *rest_hops[1:]])
urlpath = "::".join([main_hop, url, *rest_hops[1:]])
else:
storage_options = None
fs, *_ = fsspec.get_fs_token_paths(xjoin(posix_path, pattern), storage_options=storage_options)
# - If there's no "*" in the pattern, get_fs_token_paths() doesn't do any pattern matching
# so to be able to glob patterns like "[0-9]", we have to call `fs.glob`.
# - Also "*" in get_fs_token_paths() only matches files: we have to call `fs.glob` to match directories.
# - If there is "**" in the pattern, `fs.glob` must be called anyway.
globbed_paths = fs.glob(xjoin(main_hop, pattern))
for globbed_path in globbed_paths:
yield type(path)("::".join([f"{fs.protocol}://{globbed_path}"] + rest_hops))
fs, *_ = fsspec.get_fs_token_paths(urlpath, storage_options=storage_options)
for dirpath, dirnames, filenames in fs.walk(main_hop):
yield "::".join([f"{fs.protocol}://{dirpath}"] + rest_hops), dirnames, filenames


def xpathrglob(path, pattern, **kwargs):
"""Rglob function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.
class xPath(type(Path())):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many changes in this file are just about moving functions inside this class.

For example I moved xpathrglob to xPath.rglob

def glob(self, pattern, use_auth_token: Optional[Union[str, bool]] = None):
"""Glob function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.

Args:
path (:obj:`~pathlib.Path`): Calling Path instance.
pattern (:obj:`str`): Pattern that resulting paths must match.
Args:
path (:obj:`~pathlib.Path`): Calling Path instance.
pattern (:obj:`str`): Pattern that resulting paths must match.

Yields:
:obj:`~pathlib.Path`
"""
return xpathglob(path, "**/" + pattern, **kwargs)
Yields:
:obj:`~pathlib.Path`
"""
posix_path = _as_posix(self)
main_hop, *rest_hops = posix_path.split("::")
if is_local_path(main_hop):
yield from Path(main_hop).glob(pattern)
else:
# globbing inside a zip in a private repo requires authentication
if rest_hops and (rest_hops[0].startswith("http://") or rest_hops[0].startswith("https://")):
url = rest_hops[0]
url, kwargs = _prepare_http_url_kwargs(url, use_auth_token=use_auth_token)
storage_options = {"https": kwargs}
posix_path = "::".join([main_hop, url, *rest_hops[1:]])
else:
storage_options = None
fs, *_ = fsspec.get_fs_token_paths(xjoin(posix_path, pattern), storage_options=storage_options)
# - If there's no "*" in the pattern, get_fs_token_paths() doesn't do any pattern matching
# so to be able to glob patterns like "[0-9]", we have to call `fs.glob`.
# - Also "*" in get_fs_token_paths() only matches files: we have to call `fs.glob` to match directories.
# - If there is "**" in the pattern, `fs.glob` must be called anyway.
globbed_paths = fs.glob(xjoin(main_hop, pattern))
for globbed_path in globbed_paths:
yield type(self)("::".join([f"{fs.protocol}://{globbed_path}"] + rest_hops))

def rglob(self, pattern, **kwargs):
"""Rglob function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.

Args:
path (:obj:`~pathlib.Path`): Calling Path instance.
pattern (:obj:`str`): Pattern that resulting paths must match.

def xpathparent(path: Path):
"""Name function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.
Yields:
:obj:`~pathlib.Path`
"""
return self.glob("**/" + pattern, **kwargs)

Args:
path (:obj:`~pathlib.Path`): Calling Path instance.
@property
def parent(self) -> "xPath":
"""Name function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.

Returns:
:obj:`~pathlib.Path`
"""
return type(path)(xdirname(_as_posix(path)))
Args:
path (:obj:`~pathlib.Path`): Calling Path instance.

Returns:
:obj:`~pathlib.Path`
"""
return type(self)(xdirname(_as_posix(self)))

@property
def name(self) -> PurePosixPath:
"""Name function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.

def xpathname(path: Path):
"""Name function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.
Args:
path (:obj:`~pathlib.Path`): Calling Path instance.

Args:
path (:obj:`~pathlib.Path`): Calling Path instance.
Returns:
:obj:`str`
"""
return PurePosixPath(_as_posix(self).split("::")[0]).name

Returns:
:obj:`str`
"""
return PurePosixPath(_as_posix(path).split("::")[0]).name
@property
def stem(self) -> PurePosixPath:
"""Stem function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.

Args:
path (:obj:`~pathlib.Path`): Calling Path instance.

def xpathstem(path: Path):
"""Stem function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.
Returns:
:obj:`str`
"""
return PurePosixPath(_as_posix(self).split("::")[0]).stem

Args:
path (:obj:`~pathlib.Path`): Calling Path instance.
@property
def suffix(self) -> PurePosixPath:
"""Suffix function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.

Returns:
:obj:`str`
"""
return PurePosixPath(_as_posix(path).split("::")[0]).stem
Args:
path (:obj:`~pathlib.Path`): Calling Path instance.

Returns:
:obj:`str`
"""
return PurePosixPath(_as_posix(self).split("::")[0]).suffix

def xpathsuffix(path: Path):
"""Suffix function for argument of type :obj:`~pathlib.Path` that supports both local paths end remote URLs.
def open(self, *args, **kwargs):
"""Extend :func:`xopen` to support argument of type :obj:`~pathlib.Path`.

Args:
path (:obj:`~pathlib.Path`): Calling Path instance.
Args:
path (:obj:`~pathlib.Path`): Calling Path instance.
**kwargs: Keyword arguments passed to :func:`fsspec.open`.

Returns:
:obj:`str`
"""
return PurePosixPath(_as_posix(path).split("::")[0]).suffix
Returns:
:obj:`io.FileIO`: File-like object.
"""
return xopen(_as_posix(self), *args, **kwargs)

def joinpath(self, *p: Tuple[str, ...]) -> "xPath":
"""Extend :func:`xjoin` to support argument of type :obj:`~pathlib.Path`.

def xwalk(urlpath, use_auth_token: Optional[Union[str, bool]] = None):
"""Extend `os.walk` function to support remote files.
Args:
a (:obj:`~pathlib.Path`): Calling Path instance.
*p (:obj:`tuple` of :obj:`str`): Other path components.

Args:
urlpath (:obj:`str`): URL root path.
use_auth_token (:obj:`bool` or :obj:`str`, optional): Whether to use token or token to authenticate on the
Hugging Face Hub for private remote files.
Returns:
obj:`str`
"""
return type(self)(xjoin(_as_posix(self), *p))

Yields:
:obj:`tuple`: 3-tuple (dirpath, dirnames, filenames).
"""
main_hop, *rest_hops = str(urlpath).split("::")
if is_local_path(main_hop):
yield from os.walk(main_hop)
else:
# walking inside a zip in a private repo requires authentication
if rest_hops and (rest_hops[0].startswith("http://") or rest_hops[0].startswith("https://")):
url = rest_hops[0]
url, kwargs = _prepare_http_url_kwargs(url, use_auth_token=use_auth_token)
storage_options = {"https": kwargs}
urlpath = "::".join([main_hop, url, *rest_hops[1:]])
else:
storage_options = None
fs, *_ = fsspec.get_fs_token_paths(urlpath, storage_options=storage_options)
for dirpath, dirnames, filenames in fs.walk(main_hop):
yield "::".join([f"{fs.protocol}://{dirpath}"] + rest_hops), dirnames, filenames
def __truediv__(self, p: str) -> "xPath":
return self.joinpath(p)


def xpandas_read_csv(filepath_or_buffer, use_auth_token: Optional[Union[str, bool]] = None, **kwargs):
Expand All @@ -657,7 +660,10 @@ def xpandas_read_csv(filepath_or_buffer, use_auth_token: Optional[Union[str, boo
if hasattr(filepath_or_buffer, "read"):
return pd.read_csv(filepath_or_buffer, **kwargs)
else:
return pd.read_csv(xopen(filepath_or_buffer, use_auth_token=use_auth_token), **kwargs)
filepath_or_buffer = str(filepath_or_buffer)
if kwargs.get("compression", "infer") == "infer":
kwargs["compression"] = _get_extraction_protocol(filepath_or_buffer, use_auth_token=use_auth_token)
return pd.read_csv(xopen(filepath_or_buffer, "rb", use_auth_token=use_auth_token), **kwargs)


def xpandas_read_excel(filepath_or_buffer, **kwargs):
Expand Down
7 changes: 5 additions & 2 deletions src/datasets/features/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ def decode_example(self, value: dict, token_per_repo_id=None) -> "PIL.Image.Imag
else:
raise ImportError("To support decoding images, please install 'Pillow'.")

if token_per_repo_id is None:
token_per_repo_id = {}

path, bytes_ = value["path"], value["bytes"]
if bytes_ is None:
if path is None:
Expand All @@ -136,8 +139,8 @@ def decode_example(self, value: dict, token_per_repo_id=None) -> "PIL.Image.Imag
source_url = path.split("::")[-1]
try:
repo_id = string_to_dict(source_url, config.HUB_DATASETS_URL)["repo_id"]
use_auth_token = token_per_repo_id[repo_id]
except (ValueError, KeyError):
use_auth_token = token_per_repo_id.get(repo_id)
except ValueError:
use_auth_token = None
with xopen(path, "rb", use_auth_token=use_auth_token) as f:
bytes_ = BytesIO(f.read())
Expand Down
Empty file.
Loading