Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Release 0.5.0] Adding Cloud Storage Provider tutorial section #813

Merged
merged 1 commit into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions docs/source/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,93 @@ The stack of DataPipes can then be constructed using their functional forms (rec

In the above example, ``datapipes1`` and ``datapipes2`` represent the exact same stack of ``IterDataPipe``\s. We
recommend using the functional form of DataPipes.

Working with Cloud Storage Providers
---------------------------------------------

In this section, we show examples accessing AWS S3 and Google Cloud Storage with built-in``fsspec`` DataPipes.
Although only those two providers are discussed here, with additional libraries, ``fsspec`` DataPipes
should allow you to connect with other storage systems as well (`list of known
implementations <https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations>`_).

Let us know on GitHub if you have a request for support for other cloud storage providers,
or you have code examples to share with the community.

Accessing AWS S3 with ``fsspec`` DataPipes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This requires the installation of the libraries ``fsspec``
(`documentation <https://filesystem-spec.readthedocs.io/en/latest/>`_) and ``s3fs``
(`s3fs GitHub repo <https://github.com/fsspec/s3fs>`_).

You can list out the files within a S3 bucket directory by passing a path that starts
with ``"s3://BUCKET_NAME"`` to
`FSSpecFileLister <generated/torchdata.datapipes.iter.FSSpecFileLister.html>`_ (``.list_files_by_fsspec(...)``).

.. code:: python

from torchdata.datapipes.iter import IterableWrapper

dp = IterableWrapper(["s3://BUCKET_NAME"]).list_files_by_fsspec()

You can also open files using `FSSpecFileOpener <generated/torchdata.datapipes.iter.FSSpecFileOpener.html>`_
(``.open_files_by_fsspec(...)``) and stream them
(if supported by the file format).

Note that you can also provide additional parameters via
the argument ``kwargs_for_open``. This can be useful for purposes such as accessing specific
bucket version, which you can do so by passing in ``{version_id: 'SOMEVERSIONID'}`` (more `details
about S3 bucket version awareness <https://s3fs.readthedocs.io/en/latest/#bucket-version-awareness>`_
by ``s3fs``). The supported arguments vary by the (cloud) file system that you are accessing.

In the example below, we are streaming the archive by using
`TarArchiveLoader <generated/torchdata.datapipes.iter.TarArchiveLoader.html#>`_ (``.load_from_tar(mode="r|")``),
in contrast with the usual ``mode="r:"``. This allows us to begin processing data inside the archive
without downloading the whole archive into memory first.

.. code:: python

from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(["s3://BUCKET_NAME/DIRECTORY/1.tar"])
dp = dp.open_files_by_fsspec(mode="rb", anon=True).load_from_tar(mode="r|") # Streaming version
# The rest of data processing logic goes here


Finally, `FSSpecFileSaver <generated/torchdata.datapipes.iter.FSSpecSaver.html>`_
is also available for writing data to cloud.

Accessing Google Cloud Storage (GCS) with ``fsspec`` DataPipes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This requires the installation of the libraries ``fsspec``
(`documentation <https://filesystem-spec.readthedocs.io/en/latest/>`_) and ``gcsfs``
(`gcsfs GitHub repo <https://github.com/fsspec/gcsfs>`_).

You can list out the files within a GCS bucket directory by specifying a path that starts
with ``"gcs://BUCKET_NAME"``. The bucket name in the example below is ``uspto-pair``.

.. code:: python

from torchdata.datapipes.iter import IterableWrapper

dp = IterableWrapper(["gcs://uspto-pair/"]).list_files_by_fsspec()
print(list(dp))
# ['gcs://uspto-pair/applications', 'gcs://uspto-pair/docs', 'gcs://uspto-pair/prosecution-history-docs']

Here is an example of loading a zip file ``05900035.zip`` from a bucket named ``uspto-pair`` inside the
directory ``applications``.

.. code:: python

from torchdata.datapipes.iter import IterableWrapper

dp = IterableWrapper(["gcs://uspto-pair/applications/05900035.zip"]) \
.open_files_by_fsspec(mode="rb") \
.load_from_zip()
# Logic to process those archive files comes after
for path, filestream in dp:
print(path, filestream)
# gcs:/uspto-pair/applications/05900035.zip/05900035/README.txt, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-address_and_attorney_agent.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-application_data.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-continuity_data.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-transaction_history.tsv, StreamWrapper<...>
3 changes: 2 additions & 1 deletion torchdata/datapipes/iter/load/aisio.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def _assert_aistore_version() -> None:
@functional_datapipe("list_files_by_ais")
class AISFileListerIterDataPipe(IterDataPipe[str]):
"""
Iterable Datapipe that lists files from the AIStore backends with the given URL prefixes. (functional name: ``list_files_by_ais``).
Iterable Datapipe that lists files from the AIStore backends with the given URL prefixes
(functional name: ``list_files_by_ais``).
Acceptable prefixes include but not limited to - `ais://bucket-name`, `ais://bucket-name/`

Note:
Expand Down
3 changes: 2 additions & 1 deletion torchdata/datapipes/iter/load/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def _assert_fsspec() -> None:
class FSSpecFileListerIterDataPipe(IterDataPipe[str]):
r"""
Lists the contents of the directory at the provided ``root`` pathname or URL,
and yields the full pathname or URL for each file within the directory.
and yields the full pathname or URL for each file within the
directory (functional name: ``list_files_by_fsspec``).

Args:
root: The root `fsspec` path directory or list of path directories to list files from
Expand Down
2 changes: 1 addition & 1 deletion torchdata/datapipes/iter/load/iopath.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _create_default_pathmanager():
class IoPathFileListerIterDataPipe(IterDataPipe[str]):
r"""
Lists the contents of the directory at the provided ``root`` pathname or URL,
and yields the full pathname or URL for each file within the directory.
and yields the full pathname or URL for each file within the directory (functional name: ``list_files_by_iopath``).

Args:
root: The root local filepath or URL directory or list of roots to list files from
Expand Down