Skip to content

Commit

Permalink
Adding Cloud Storage Provider tutorial section (#812)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #812

Adding example of DataPipe usage with cloud storage providers (AWS S3 and GCS) via `fsspec`.

Feel free to run the examples to verify that they work. Let me know what additional information I should provide as well.

![Screen Shot 2022-10-06 at 7 12 13 PM](https://user-images.githubusercontent.com/4935152/194434873-c8edb939-0e43-4622-883b-c1ce88f767f2.png)

![Screen Shot 2022-10-06 at 7 12 25 PM](https://user-images.githubusercontent.com/4935152/194434886-8b730522-0881-4e53-a5b9-75459304c061.png)

Test Plan: Imported from OSS

Reviewed By: ejguan, msaroufim

Differential Revision: D40162645

fbshipit-source-id: 6a7c156714b30bfed2da900b6d47d64f745333e2
  • Loading branch information
NivekT committed Oct 7, 2022
1 parent 9ad8efb commit dd5496c
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 3 deletions.
90 changes: 90 additions & 0 deletions docs/source/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,93 @@ The stack of DataPipes can then be constructed using their functional forms (rec
In the above example, ``datapipes1`` and ``datapipes2`` represent the exact same stack of ``IterDataPipe``\s. We
recommend using the functional form of DataPipes.

Working with Cloud Storage Providers
---------------------------------------------

In this section, we show examples accessing AWS S3 and Google Cloud Storage with built-in``fsspec`` DataPipes.
Although only those two providers are discussed here, with additional libraries, ``fsspec`` DataPipes
should allow you to connect with other storage systems as well (`list of known
implementations <https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations>`_).

Let us know on GitHub if you have a request for support for other cloud storage providers,
or you have code examples to share with the community.

Accessing AWS S3 with ``fsspec`` DataPipes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This requires the installation of the libraries ``fsspec``
(`documentation <https://filesystem-spec.readthedocs.io/en/latest/>`_) and ``s3fs``
(`s3fs GitHub repo <https://github.com/fsspec/s3fs>`_).

You can list out the files within a S3 bucket directory by passing a path that starts
with ``"s3://BUCKET_NAME"`` to
`FSSpecFileLister <generated/torchdata.datapipes.iter.FSSpecFileLister.html>`_ (``.list_files_by_fsspec(...)``).

.. code:: python
from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(["s3://BUCKET_NAME"]).list_files_by_fsspec()
You can also open files using `FSSpecFileOpener <generated/torchdata.datapipes.iter.FSSpecFileOpener.html>`_
(``.open_files_by_fsspec(...)``) and stream them
(if supported by the file format).

Note that you can also provide additional parameters via
the argument ``kwargs_for_open``. This can be useful for purposes such as accessing specific
bucket version, which you can do so by passing in ``{version_id: 'SOMEVERSIONID'}`` (more `details
about S3 bucket version awareness <https://s3fs.readthedocs.io/en/latest/#bucket-version-awareness>`_
by ``s3fs``). The supported arguments vary by the (cloud) file system that you are accessing.

In the example below, we are streaming the archive by using
`TarArchiveLoader <generated/torchdata.datapipes.iter.TarArchiveLoader.html#>`_ (``.load_from_tar(mode="r|")``),
in contrast with the usual ``mode="r:"``. This allows us to begin processing data inside the archive
without downloading the whole archive into memory first.

.. code:: python
from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(["s3://BUCKET_NAME/DIRECTORY/1.tar"])
dp = dp.open_files_by_fsspec(mode="rb", anon=True).load_from_tar(mode="r|") # Streaming version
# The rest of data processing logic goes here
Finally, `FSSpecFileSaver <generated/torchdata.datapipes.iter.FSSpecSaver.html>`_
is also available for writing data to cloud.

Accessing Google Cloud Storage (GCS) with ``fsspec`` DataPipes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This requires the installation of the libraries ``fsspec``
(`documentation <https://filesystem-spec.readthedocs.io/en/latest/>`_) and ``gcsfs``
(`gcsfs GitHub repo <https://github.com/fsspec/gcsfs>`_).

You can list out the files within a GCS bucket directory by specifying a path that starts
with ``"gcs://BUCKET_NAME"``. The bucket name in the example below is ``uspto-pair``.

.. code:: python
from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(["gcs://uspto-pair/"]).list_files_by_fsspec()
print(list(dp))
# ['gcs://uspto-pair/applications', 'gcs://uspto-pair/docs', 'gcs://uspto-pair/prosecution-history-docs']
Here is an example of loading a zip file ``05900035.zip`` from a bucket named ``uspto-pair`` inside the
directory ``applications``.

.. code:: python
from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(["gcs://uspto-pair/applications/05900035.zip"]) \
.open_files_by_fsspec(mode="rb") \
.load_from_zip()
# Logic to process those archive files comes after
for path, filestream in dp:
print(path, filestream)
# gcs:/uspto-pair/applications/05900035.zip/05900035/README.txt, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-address_and_attorney_agent.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-application_data.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-continuity_data.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-transaction_history.tsv, StreamWrapper<...>
3 changes: 2 additions & 1 deletion torchdata/datapipes/iter/load/aisio.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def _assert_aistore_version() -> None:
@functional_datapipe("list_files_by_ais")
class AISFileListerIterDataPipe(IterDataPipe[str]):
"""
Iterable Datapipe that lists files from the AIStore backends with the given URL prefixes. (functional name: ``list_files_by_ais``).
Iterable Datapipe that lists files from the AIStore backends with the given URL prefixes
(functional name: ``list_files_by_ais``).
Acceptable prefixes include but not limited to - `ais://bucket-name`, `ais://bucket-name/`
Note:
Expand Down
3 changes: 2 additions & 1 deletion torchdata/datapipes/iter/load/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def _assert_fsspec() -> None:
class FSSpecFileListerIterDataPipe(IterDataPipe[str]):
r"""
Lists the contents of the directory at the provided ``root`` pathname or URL,
and yields the full pathname or URL for each file within the directory.
and yields the full pathname or URL for each file within the
directory (functional name: ``list_files_by_fsspec``).
Args:
root: The root `fsspec` path directory or list of path directories to list files from
Expand Down
2 changes: 1 addition & 1 deletion torchdata/datapipes/iter/load/iopath.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _create_default_pathmanager():
class IoPathFileListerIterDataPipe(IterDataPipe[str]):
r"""
Lists the contents of the directory at the provided ``root`` pathname or URL,
and yields the full pathname or URL for each file within the directory.
and yields the full pathname or URL for each file within the directory (functional name: ``list_files_by_iopath``).
Args:
root: The root local filepath or URL directory or list of roots to list files from
Expand Down

0 comments on commit dd5496c

Please sign in to comment.