From 50d832980b8514b35c349eace91f19d36582631f Mon Sep 17 00:00:00 2001 From: Kevin Tse Date: Fri, 7 Oct 2022 13:42:21 -0700 Subject: [PATCH] Adding Cloud Storage Provider tutorial section (#812) Summary: Pull Request resolved: https://github.com/pytorch/data/pull/812 Adding example of DataPipe usage with cloud storage providers (AWS S3 and GCS) via `fsspec`. Feel free to run the examples to verify that they work. Let me know what additional information I should provide as well. ![Screen Shot 2022-10-06 at 7 12 13 PM](https://user-images.githubusercontent.com/4935152/194434873-c8edb939-0e43-4622-883b-c1ce88f767f2.png) ![Screen Shot 2022-10-06 at 7 12 25 PM](https://user-images.githubusercontent.com/4935152/194434886-8b730522-0881-4e53-a5b9-75459304c061.png) Test Plan: Imported from OSS Reviewed By: ejguan, msaroufim Differential Revision: D40162645 fbshipit-source-id: 6a7c156714b30bfed2da900b6d47d64f745333e2 --- docs/source/tutorial.rst | 90 +++++++++++++++++++++++++ torchdata/datapipes/iter/load/aisio.py | 3 +- torchdata/datapipes/iter/load/fsspec.py | 3 +- torchdata/datapipes/iter/load/iopath.py | 2 +- 4 files changed, 95 insertions(+), 3 deletions(-) diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst index fd33beac8..3370856ce 100644 --- a/docs/source/tutorial.rst +++ b/docs/source/tutorial.rst @@ -294,3 +294,93 @@ The stack of DataPipes can then be constructed using their functional forms (rec In the above example, ``datapipes1`` and ``datapipes2`` represent the exact same stack of ``IterDataPipe``\s. We recommend using the functional form of DataPipes. + +Working with Cloud Storage Providers +--------------------------------------------- + +In this section, we show examples accessing AWS S3 and Google Cloud Storage with built-in``fsspec`` DataPipes. +Although only those two providers are discussed here, with additional libraries, ``fsspec`` DataPipes +should allow you to connect with other storage systems as well (`list of known +implementations `_). + +Let us know on GitHub if you have a request for support for other cloud storage providers, +or you have code examples to share with the community. + +Accessing AWS S3 with ``fsspec`` DataPipes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +This requires the installation of the libraries ``fsspec`` +(`documentation `_) and ``s3fs`` +(`s3fs GitHub repo `_). + +You can list out the files within a S3 bucket directory by passing a path that starts +with ``"s3://BUCKET_NAME"`` to +`FSSpecFileLister `_ (``.list_files_by_fsspec(...)``). + +.. code:: python + + from torchdata.datapipes.iter import IterableWrapper + + dp = IterableWrapper(["s3://BUCKET_NAME"]).list_files_by_fsspec() + +You can also open files using `FSSpecFileOpener `_ +(``.open_files_by_fsspec(...)``) and stream them +(if supported by the file format). + +Note that you can also provide additional parameters via +the argument ``kwargs_for_open``. This can be useful for purposes such as accessing specific +bucket version, which you can do so by passing in ``{version_id: 'SOMEVERSIONID'}`` (more `details +about S3 bucket version awareness `_ +by ``s3fs``). The supported arguments vary by the (cloud) file system that you are accessing. + +In the example below, we are streaming the archive by using +`TarArchiveLoader `_ (``.load_from_tar(mode="r|")``), +in contrast with the usual ``mode="r:"``. This allows us to begin processing data inside the archive +without downloading the whole archive into memory first. + +.. code:: python + + from torchdata.datapipes.iter import IterableWrapper + dp = IterableWrapper(["s3://BUCKET_NAME/DIRECTORY/1.tar"]) + dp = dp.open_files_by_fsspec(mode="rb", anon=True).load_from_tar(mode="r|") # Streaming version + # The rest of data processing logic goes here + + +Finally, `FSSpecFileSaver `_ +is also available for writing data to cloud. + +Accessing Google Cloud Storage (GCS) with ``fsspec`` DataPipes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +This requires the installation of the libraries ``fsspec`` +(`documentation `_) and ``gcsfs`` +(`gcsfs GitHub repo `_). + +You can list out the files within a GCS bucket directory by specifying a path that starts +with ``"gcs://BUCKET_NAME"``. The bucket name in the example below is ``uspto-pair``. + +.. code:: python + + from torchdata.datapipes.iter import IterableWrapper + + dp = IterableWrapper(["gcs://uspto-pair/"]).list_files_by_fsspec() + print(list(dp)) + # ['gcs://uspto-pair/applications', 'gcs://uspto-pair/docs', 'gcs://uspto-pair/prosecution-history-docs'] + +Here is an example of loading a zip file ``05900035.zip`` from a bucket named ``uspto-pair`` inside the +directory ``applications``. + +.. code:: python + + from torchdata.datapipes.iter import IterableWrapper + + dp = IterableWrapper(["gcs://uspto-pair/applications/05900035.zip"]) \ + .open_files_by_fsspec(mode="rb") \ + .load_from_zip() + # Logic to process those archive files comes after + for path, filestream in dp: + print(path, filestream) + # gcs:/uspto-pair/applications/05900035.zip/05900035/README.txt, StreamWrapper<...> + # gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-address_and_attorney_agent.tsv, StreamWrapper<...> + # gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-application_data.tsv, StreamWrapper<...> + # gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-continuity_data.tsv, StreamWrapper<...> + # gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-transaction_history.tsv, StreamWrapper<...> diff --git a/torchdata/datapipes/iter/load/aisio.py b/torchdata/datapipes/iter/load/aisio.py index b3e25f7aa..be2476bf9 100644 --- a/torchdata/datapipes/iter/load/aisio.py +++ b/torchdata/datapipes/iter/load/aisio.py @@ -49,7 +49,8 @@ def _assert_aistore_version() -> None: @functional_datapipe("list_files_by_ais") class AISFileListerIterDataPipe(IterDataPipe[str]): """ - Iterable Datapipe that lists files from the AIStore backends with the given URL prefixes. (functional name: ``list_files_by_ais``). + Iterable Datapipe that lists files from the AIStore backends with the given URL prefixes + (functional name: ``list_files_by_ais``). Acceptable prefixes include but not limited to - `ais://bucket-name`, `ais://bucket-name/` Note: diff --git a/torchdata/datapipes/iter/load/fsspec.py b/torchdata/datapipes/iter/load/fsspec.py index aa5ec8e44..6553157a6 100644 --- a/torchdata/datapipes/iter/load/fsspec.py +++ b/torchdata/datapipes/iter/load/fsspec.py @@ -37,7 +37,8 @@ def _assert_fsspec() -> None: class FSSpecFileListerIterDataPipe(IterDataPipe[str]): r""" Lists the contents of the directory at the provided ``root`` pathname or URL, - and yields the full pathname or URL for each file within the directory. + and yields the full pathname or URL for each file within the + directory (functional name: ``list_files_by_fsspec``). Args: root: The root `fsspec` path directory or list of path directories to list files from diff --git a/torchdata/datapipes/iter/load/iopath.py b/torchdata/datapipes/iter/load/iopath.py index 61a3db2dd..62fc9bf96 100644 --- a/torchdata/datapipes/iter/load/iopath.py +++ b/torchdata/datapipes/iter/load/iopath.py @@ -43,7 +43,7 @@ def _create_default_pathmanager(): class IoPathFileListerIterDataPipe(IterDataPipe[str]): r""" Lists the contents of the directory at the provided ``root`` pathname or URL, - and yields the full pathname or URL for each file within the directory. + and yields the full pathname or URL for each file within the directory (functional name: ``list_files_by_iopath``). Args: root: The root local filepath or URL directory or list of roots to list files from