diff --git a/test/test_fsspec.py b/test/test_fsspec.py index 14793b4e4..7e596ef6e 100644 --- a/test/test_fsspec.py +++ b/test/test_fsspec.py @@ -94,10 +94,10 @@ def test_fsspec_file_loader_iterdatapipe(self): # Reset Test: Ensure the resulting streams are still readable after the DataPipe is reset/exhausted self._write_text_files() lister_dp = FileLister(self.temp_dir.name, "*.text") - fsspec_file_loader_dp = FSSpecFileOpener(lister_dp, mode="rb") + fsspec_file_opener_dp = lister_dp.open_files_by_fsspec(mode="rb") n_elements_before_reset = 2 - res_before_reset, res_after_reset = reset_after_n_next_calls(fsspec_file_loader_dp, n_elements_before_reset) + res_before_reset, res_after_reset = reset_after_n_next_calls(fsspec_file_opener_dp, n_elements_before_reset) self.assertEqual(2, len(res_before_reset)) self.assertEqual(3, len(res_after_reset)) for _name, stream in res_before_reset: diff --git a/test/test_local_io.py b/test/test_local_io.py index dea43e633..3be37a5ed 100644 --- a/test/test_local_io.py +++ b/test/test_local_io.py @@ -684,10 +684,10 @@ def test_io_path_file_loader_iterdatapipe(self): # Reset Test: Ensure the resulting streams are still readable after the DataPipe is reset/exhausted self._write_text_files() lister_dp = FileLister(self.temp_dir.name, "*.text") - iopath_file_loader_dp = IoPathFileOpener(lister_dp, mode="rb") + iopath_file_opener_dp = lister_dp.open_files_by_iopath(mode="rb") n_elements_before_reset = 2 - res_before_reset, res_after_reset = reset_after_n_next_calls(iopath_file_loader_dp, n_elements_before_reset) + res_before_reset, res_after_reset = reset_after_n_next_calls(iopath_file_opener_dp, n_elements_before_reset) self.assertEqual(2, len(res_before_reset)) self.assertEqual(3, len(res_after_reset)) for _name, stream in res_before_reset: diff --git a/torchdata/datapipes/iter/load/README.md b/torchdata/datapipes/iter/load/README.md index 66466cfa2..731914402 100644 --- a/torchdata/datapipes/iter/load/README.md +++ b/torchdata/datapipes/iter/load/README.md @@ -35,8 +35,8 @@ Note: refer to the official documentation for detailed installtion instructions ### S3FileLister -`S3FileLister` accepts a list of S3 prefixes and iterates all matching s3 urls. The functional API is `list_file_by_s3`. -Acceptable prefixes include `s3://bucket-name`, `s3://bucket-name/`, `s3://bucket-name/folder`, +`S3FileLister` accepts a list of S3 prefixes and iterates all matching s3 urls. The functional API is +`list_files_by_s3`. Acceptable prefixes include `s3://bucket-name`, `s3://bucket-name/`, `s3://bucket-name/folder`, `s3://bucket-name/folder/`, and `s3://bucket-name/prefix`. You may also set `length`, `request_timeout_ms` (default 3000 ms in aws-sdk-cpp), and `region`. Note that: @@ -48,7 +48,7 @@ ms in aws-sdk-cpp), and `region`. Note that: ### S3FileLoader `S3FileLoader` accepts a list of S3 URLs and iterates all files in `BytesIO` format with `(url, BytesIO)` tuples. The -functional API is `load_file_by_s3`. You may also set `request_timeout_ms` (default 3000 ms in aws-sdk-cpp), `region`, +functional API is `load_files_by_s3`. You may also set `request_timeout_ms` (default 3000 ms in aws-sdk-cpp), `region`, `buffer_size` (default 120Mb), and `multi_part_download` (default to use multi-part downloading). Note that: 1. Input **must** be a list and S3 URLs must be valid. diff --git a/torchdata/datapipes/iter/load/fsspec.py b/torchdata/datapipes/iter/load/fsspec.py index fa72fae43..2ded99eee 100644 --- a/torchdata/datapipes/iter/load/fsspec.py +++ b/torchdata/datapipes/iter/load/fsspec.py @@ -101,11 +101,11 @@ def __iter__(self) -> Iterator[str]: yield abs_path -@functional_datapipe("open_file_by_fsspec") +@functional_datapipe("open_files_by_fsspec") class FSSpecFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]): r""" Opens files from input datapipe which contains `fsspec` paths and yields a tuple of - pathname and opened file stream (functional name: ``open_file_by_fsspec``). + pathname and opened file stream (functional name: ``open_files_by_fsspec``). Args: source_datapipe: Iterable DataPipe that provides the pathnames or URLs @@ -114,7 +114,7 @@ class FSSpecFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]): Example: >>> from torchdata.datapipes.iter import FSSpecFileLister >>> datapipe = FSSpecFileLister(root=dir_path) - >>> file_dp = datapipe.open_file_by_fsspec() + >>> file_dp = datapipe.open_files_by_fsspec() """ def __init__(self, source_datapipe: IterDataPipe[str], mode: str = "r") -> None: @@ -133,6 +133,10 @@ def __len__(self) -> int: return len(self.source_datapipe) +# Register for functional API for backward compatibility +IterDataPipe.register_datapipe_as_function("open_file_by_fsspec", FSSpecFileOpenerIterDataPipe) + + @functional_datapipe("save_by_fsspec") class FSSpecSaverIterDataPipe(IterDataPipe[str]): r""" diff --git a/torchdata/datapipes/iter/load/iopath.py b/torchdata/datapipes/iter/load/iopath.py index 22d4e4c2c..0a04d6517 100644 --- a/torchdata/datapipes/iter/load/iopath.py +++ b/torchdata/datapipes/iter/load/iopath.py @@ -96,11 +96,11 @@ def __iter__(self) -> Iterator[str]: yield os.path.join(path, file_name) -@functional_datapipe("open_file_by_iopath") +@functional_datapipe("open_files_by_iopath") class IoPathFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]): r""" Opens files from input datapipe which contains pathnames or URLs, - and yields a tuple of pathname and opened file stream (functional name: ``open_file_by_iopath``). + and yields a tuple of pathname and opened file stream (functional name: ``open_files_by_iopath``). Args: source_datapipe: Iterable DataPipe that provides the pathnames or URLs @@ -114,7 +114,7 @@ class IoPathFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]): Example: >>> from torchdata.datapipes.iter import IoPathFileLister >>> datapipe = IoPathFileLister(root=S3URL) - >>> file_dp = datapipe.open_file_by_iopath() + >>> file_dp = datapipe.open_files_by_iopath() """ def __init__(self, source_datapipe: IterDataPipe[str], mode: str = "r", pathmgr=None) -> None: @@ -141,6 +141,10 @@ def __len__(self) -> int: return len(self.source_datapipe) +# Register for functional API for backward compatibility +IterDataPipe.register_datapipe_as_function("open_file_by_iopath", IoPathFileOpenerIterDataPipe) + + @functional_datapipe("save_by_iopath") class IoPathSaverIterDataPipe(IterDataPipe[str]): diff --git a/torchdata/datapipes/iter/load/s3io.py b/torchdata/datapipes/iter/load/s3io.py index 7b03031f6..086156e8c 100644 --- a/torchdata/datapipes/iter/load/s3io.py +++ b/torchdata/datapipes/iter/load/s3io.py @@ -13,10 +13,10 @@ from torchdata.datapipes.utils import StreamWrapper -@functional_datapipe("list_file_by_s3") +@functional_datapipe("list_files_by_s3") class S3FileListerIterDataPipe(IterDataPipe[str]): r""" - Iterable DataPipe that lists Amazon S3 file URLs with the given prefixes (functional name: ``list_file_by_s3``). + Iterable DataPipe that lists Amazon S3 file URLs with the given prefixes (functional name: ``list_files_by_s3``). Acceptable prefixes include ``s3://bucket-name``, ``s3://bucket-name/``, ``s3://bucket-name/folder``, ``s3://bucket-name/folder/``, and ``s3://bucket-name/prefix``. You may also set ``length``, ``request_timeout_ms`` (default 3000 ms in aws-sdk-cpp), and ``region``. @@ -72,10 +72,10 @@ def __len__(self) -> int: return self.length -@functional_datapipe("load_file_by_s3") +@functional_datapipe("load_files_by_s3") class S3FileLoaderIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]): r""" - Iterable DataPipe that loads Amazon S3 files from the given S3 URLs (functional name: ``load_file_by_s3``). + Iterable DataPipe that loads Amazon S3 files from the given S3 URLs (functional name: ``load_files_by_s3``). ``S3FileLoader`` iterates all given S3 URLs in ``BytesIO`` format with ``(url, BytesIO)`` tuples. You may also set ``request_timeout_ms`` (default 3000 ms in aws-sdk-cpp), ``region``, ``buffer_size`` (default 120Mb), and ``multi_part_download`` (default to use multi-part downloading).