Skip to content

Commit

Permalink
Update documentation for S3 DataPipes (#534)
Browse files Browse the repository at this point in the history
Summary:
Fixes #523

- Make in-line doc as the single source of truth
- Add `sharding` and `shuffle` to the example for distributed training
- Rephrase each arguments based on the offline discussion with ydaiming

Pull Request resolved: #534

Reviewed By: NivekT

Differential Revision: D37348999

Pulled By: ejguan

fbshipit-source-id: 6cf76a26bdeeb0842aff92ca9b0be53801acb29f
  • Loading branch information
ejguan authored and facebook-github-bot committed Jun 23, 2022
1 parent 8682f6e commit cc3f866
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 70 deletions.
45 changes: 7 additions & 38 deletions torchdata/datapipes/iter/load/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ S3 IO datapipes are included when building with flag `BUILD_S3=1`. The following
source with S3 datapipes.

```bash
pip uninstall torchdata -y
git clone https://github.com/pytorch/data.git
cd data
python setup.py clean
BUILD_S3=1 python setup.py install
```

We also offer nightly and official (>=0.4.0) TorchData releases integrated with `AWSSDK` on the most of platforms.
Please check the [link](https://github.com/pytorch/data/tree/main/packaging#awssdk) for the list of supported platforms
with the pre-assembled binaries.

If you'd like to use customized installations of `pybind11` or `aws-sdk-cpp`, you may set the following flags when
building from source.

Expand All @@ -31,43 +31,12 @@ USE_SYSTEM_LIBS=1 # uses both pre-installed pybind11 and aws-sdk-cpp
Note: refer to the official documentation for detailed installtion instructions of
[aws-sdk-cpp](https://github.com/aws/aws-sdk-cpp).

## Using S3 IO datapies

### S3FileLister

`S3FileLister` accepts a list of S3 prefixes and iterates all matching s3 urls. The functional API is
`list_files_by_s3`. Acceptable prefixes include `s3://bucket-name`, `s3://bucket-name/`, `s3://bucket-name/folder`,
`s3://bucket-name/folder/`, and `s3://bucket-name/prefix`. You may also set `length`, `request_timeout_ms` (default 3000
ms in aws-sdk-cpp), and `region`. Note that:

1. Input **must** be a list and direct S3 URLs are skipped.
2. `length` is `-1` by default, and any call to `__len__()` is invalid, because the length is unknown until all files
are iterated.
3. `request_timeout_ms` and `region` will overwrite settings in the configuration file or environment variables.

### S3FileLoader

`S3FileLoader` accepts a list of S3 URLs and iterates all files in `BytesIO` format with `(url, BytesIO)` tuples. The
functional API is `load_files_by_s3`. You may also set `request_timeout_ms` (default 3000 ms in aws-sdk-cpp), `region`,
`buffer_size` (default 120Mb), and `multi_part_download` (default to use multi-part downloading). Note that:

1. Input **must** be a list and S3 URLs must be valid.
2. `request_timeout_ms` and `region` will overwrite settings in the configuration file or environment variables.

### Example

```py
from torchdata.datapipes.iter import S3FileLister, S3FileLoader
Please refer to the documentation:

s3_prefixes = ['s3://bucket-name/folder/', ...]
dp_s3_urls = S3FileLister(s3_prefixes)
dp_s3_files = S3FileLoader(s3_urls) # outputs in (url, StreamWrapper(BytesIO))
# more datapipes to convert loaded bytes, e.g.
datapipe = StreamWrapper(dp_s3_files).parse_csv(delimiter=' ')

for d in datapipe: # Start loading data
pass
```
- [`S3FileLister`](https://pytorch.org/data/main/generated/torchdata.datapipes.iter.S3FileLister.html#s3filelister)
- [`S3FileLister`](https://pytorch.org/data/main/generated/torchdata.datapipes.iter.S3FileLoader.html#s3fileloader)

### Note

Expand Down
62 changes: 30 additions & 32 deletions torchdata/datapipes/iter/load/s3io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,30 @@
class S3FileListerIterDataPipe(IterDataPipe[str]):
r"""
Iterable DataPipe that lists Amazon S3 file URLs with the given prefixes (functional name: ``list_files_by_s3``).
Acceptable prefixes include ``s3://bucket-name``, ``s3://bucket-name/``, ``s3://bucket-name/folder``,
``s3://bucket-name/folder/``, and ``s3://bucket-name/prefix``. You may also set ``length``, ``request_timeout_ms``
(default 3000 ms in aws-sdk-cpp), and ``region``.
Acceptable prefixes include ``s3://bucket-name``, ``s3://bucket-name/``, ``s3://bucket-name/folder``.
Note:
1. Input **must** be a list and direct S3 URLs are skipped.
1. ``source_datapipe`` **must** contain a list of valid S3 URLs
2. ``length`` is `-1` by default, and any call to ``__len__()`` is invalid, because the length is unknown
until all files are iterated.
3. ``request_timeout_ms`` and ``region`` will overwrite settings in the configuration file or
environment variables.
4. AWS_CPP_SDK is necessary to use the S3 DataPipe(s).
Args:
source_datapipe: a DataPipe that contains URLs/URL prefixes to s3 files
length: Nominal length of the datapipe
requestTimeoutMs: optional, overwrite the default timeout setting for this datapipe
region: optional, overwrite the default region inferred from credentials for this datapipe
request_timeout_ms: timeout setting for each reqeust (3,000ms by default)
region: region for access files (inferred from credentials by default)
Example:
>>> from torchdata.datapipes.iter import S3FileLister, S3FileLoader
>>> s3_prefixes = ['s3://bucket-name/folder/', ...]
>>> from torchdata.datapipes.iter import IterableWrapper, S3FileLister
>>> s3_prefixes = IterableWrapper(['s3://bucket-name/folder/', ...])
>>> dp_s3_urls = S3FileLister(s3_prefixes)
>>> dp_s3_files = S3FileLoader(s3_urls) # outputs in (url, StreamWrapper(BytesIO))
>>> # more datapipes to convert loaded bytes, e.g.
>>> datapipe = dp_s3_files.parse_csv(delimiter=' ')
>>> for d in datapipe: # Start loading data
>>> for d in dp_s3_urls:
... pass
# Functional API
>>> dp_s3_urls = s3_prefixes.list_files_by_s3(request_timeout_ms=100)
>>> for d in dp_s3_urls:
... pass
"""

Expand Down Expand Up @@ -77,31 +72,34 @@ class S3FileLoaderIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]):
r"""
Iterable DataPipe that loads Amazon S3 files from the given S3 URLs (functional name: ``load_files_by_s3``).
``S3FileLoader`` iterates all given S3 URLs in ``BytesIO`` format with ``(url, BytesIO)`` tuples.
You may also set ``request_timeout_ms`` (default 3000 ms in aws-sdk-cpp), ``region``,
``buffer_size`` (default 120Mb), and ``multi_part_download`` (default to use multi-part downloading).
Note:
1. Input **must** be a list and S3 URLs must be valid.
1. ``source_datapipe`` **must** contain a list of valid S3 URLs.
2. ``request_timeout_ms`` and ``region`` will overwrite settings in the
configuration file or environment variables.
3. AWS_CPP_SDK is necessary to use the S3 DataPipe(s).
Args:
source_datapipe: a DataPipe that contains URLs to s3 files
requestTimeoutMs: optional, overwrite the default timeout setting for this datapipe
region: optional, overwrite the default region inferred from credentials for this datapipe
request_timeout_ms: timeout setting for each reqeust (3,000ms by default)
region: region for access files (inferred from credentials by default)
buffer_size: buffer size of each chunk to download large files progressively (128Mb by default)
multi_part_download: flag to split each chunk into small packets and download those packets in parallel (enabled by default)
Example:
>>> from torchdata.datapipes.iter import S3FileLister, S3FileLoader
>>> s3_prefixes = ['s3://bucket-name/folder/', ...]
>>> dp_s3_urls = S3FileLister(s3_prefixes)
>>> dp_s3_files = S3FileLoader(s3_urls) # outputs in (url, StreamWrapper(BytesIO))
>>> # more datapipes to convert loaded bytes, e.g.
>>> datapipe = dp_s3_files.parse_csv(delimiter=' ')
>>> for d in datapipe: # Start loading data
... pass
>>> from torchdata.datapipes.iter import IterableWrapper, S3FileLoader
>>> dp_s3_urls = IterableWrapper(['s3://bucket-name/folder/', ...]).list_files_by_s3()
# In order to make sure data are shuffled and sharded in the
# distributed environment, `shuffle` and `sharding_filter`
# are required. For detail, please check our tutorial in:
# https://pytorch.org/data/main/tutorial.html#working-with-dataloader
>>> sharded_s3_urls = dp_s3_urls.shuffle().sharding_filter()
>>> dp_s3_files = S3FileLoader(sharded_s3_urls)
>>> for url, fd in datapipe: # Start loading data
... data = fd.read()
# Functional API
>>> dp_s3_files = sharded_s3_urls.load_files_by_s3(buffer_size=256)
>>> for url, fd in dp_s3_files:
... data = fd.read()
"""

def __init__(
Expand Down

0 comments on commit cc3f866

Please sign in to comment.