Skip to content

Commit

Permalink
Single key option for Slicer and doc improvements (#1041)
Browse files Browse the repository at this point in the history
Summary:
Single key option for Slicer and doc improvements

### Changes

- Enable Slicer to also work for a single key + functional test
- Fix typos in doc
- Add laion-example to examples page

Pull Request resolved: #1041

Reviewed By: NivekT

Differential Revision: D43622504

Pulled By: ejguan

fbshipit-source-id: b656082598f4a790dc457dddb0213a1a180239fd
  • Loading branch information
SvenDS9 authored and NivekT committed Feb 28, 2023
1 parent b57545f commit 63f9f6c
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 20 deletions.
2 changes: 1 addition & 1 deletion docs/source/dp_tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ into the ``DataLoader``. For detailed documentation related to ``DataLoader``,
please visit `this PyTorch Core page <https://pytorch.org/docs/stable/data.html#single-and-multi-process-data-loading>`_.


Please refer to `this page <dlv2_tutorial.html>`_ about using ``DataPipe`` with ``DataLoader2``.
Please refer to :doc:`this page <dlv2_tutorial>` about using ``DataPipe`` with ``DataLoader2``.


For this example, we will first have a helper function that generates some CSV files with random label and data.
Expand Down
7 changes: 7 additions & 0 deletions docs/source/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ semantic classes. Here is a
<https://github.com/tcapelle/torchdata/blob/main/01_Camvid_segmentation_with_datapipes.ipynb>`_
created by our community.

laion2B-en-joined
^^^^^^^^^^^^^^^^^^^^^^
The `laion2B-en-joined dataset <https://huggingface.co/datasets/laion/laion2B-en-joined>`_ is a subset of the `LAION-5B dataset <https://laion.ai/blog/laion-5b/>`_ containing english captions, URls pointing to images,
and other metadata. It contains around 2.32 billion entries.
Currently (February 2023) around 86% of the URLs still point to valid images. Here is a `DataPipe implementation of laion2B-en-joined
<https://github.com/pytorch/data/blob/main/examples/vision/laion5b.py>`_ that filters out unsafe images and images with watermarks and loads the images from the URLs.

Additional Datasets in TorchVision
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
In a separate PyTorch domain library `TorchVision <https://github.com/pytorch/vision>`_, you will find some of the most
Expand Down
10 changes: 5 additions & 5 deletions docs/source/reading_service.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ Features
Dynamic Sharding
^^^^^^^^^^^^^^^^

Dynamic sharding is achieved by ``MultiProcessingReadingService`` and ``DistributedReadingService`` to shard the pipeline based on the information of corresponding multiprocessing and distributed workers. And, TorchData offers two types of ``DataPipe`` letting users to define the sharding place within the pipeline.
Dynamic sharding is achieved by ``MultiProcessingReadingService`` and ``DistributedReadingService`` to shard the pipeline based on the information of corresponding multiprocessing and distributed workers. And, TorchData offers two types of ``DataPipe`` letting users define the sharding place within the pipeline.

- ``sharding_filter`` (:class:`ShardingFilter`): When the pipeline is replicable, each distributed/multiprocessing worker loads data from its own replica of the ``DataPipe`` graph, while skipping samples that do not belong to the corresponding worker at the point where ``sharding_filter`` is placed.

- ``sharding_round_robin_dispatch`` (:class:`ShardingRoundRobinDispatcher`): When there is any ``sharding_round_robin_dispatch`` ``DataPipe`` in the pipeline, that branch (i.e. all DataPipes prior to ``sharding_round_robin_dispatch``) will be treated as a non-replicable branch (in the context of multiprocessing). A single dispatching process will be created to load data from the non-replicable branch and distributed data to the subsequent worker processes.
- ``sharding_round_robin_dispatch`` (:class:`ShardingRoundRobinDispatcher`): When there is any ``sharding_round_robin_dispatch`` ``DataPipe`` in the pipeline, that branch (i.e. all DataPipes prior to ``sharding_round_robin_dispatch``) will be treated as a non-replicable branch (in the context of multiprocessing). A single dispatching process will be created to load data from the non-replicable branch and distribute data to the subsequent worker processes.

The following is an example of having two types of sharding strategies in the pipeline.

Expand Down Expand Up @@ -116,21 +116,21 @@ When multiprocessing takes place, the graph becomes:
end [shape=box];
}

``Client`` in the graph is a ``DataPipe`` that send request and receive response from multiprocessing queues.
``Client`` in the graph is a ``DataPipe`` that sends a request and receives a response from multiprocessing queues.

.. module:: torchdata.dataloader2

Determinism
^^^^^^^^^^^^

In ``DataLoader2``, a ``SeedGenerator`` becomes a single source of randomness and each ``ReadingService`` would access to it via ``initialize_iteration()`` and generate corresponding random seeds for random ``DataPipe`` operations.
In ``DataLoader2``, a ``SeedGenerator`` becomes a single source of randomness and each ``ReadingService`` would access it via ``initialize_iteration()`` and generate corresponding random seeds for random ``DataPipe`` operations.

In order to make sure that the Dataset shards are mutually exclusive and collectively exhaustive on multiprocessing processes and distributed nodes, ``MultiProcessingReadingService`` and ``DistributedReadingService`` would help :class:`DataLoader2` to synchronize random states for any random ``DataPipe`` operation prior to ``sharding_filter`` or ``sharding_round_robin_dispatch``. For the remaining ``DataPipe`` operations after sharding, unique random states are generated based on the distributed rank and worker process id by each ``ReadingService``, in order to perform different random transformations.

Graph Mode
^^^^^^^^^^^

This also allows easier transition of data-preprocessing pipeline from research to production. After the ``DataPipe`` graph is created and validated with the ``ReadingServices``, a different ``ReadingService`` that configures and connects to the production service/infra such as ``AIStore`` can be provided to :class:`DataLoader2` as a drop-in replacement. The ``ReadingService`` could potentially search the graph, and find ``DataPipe`` operations that can be delegated to the production service/infra, then modify the graph correspondingly to achieve higher-performant execution.
This also allows easier transition of data-preprocessing pipeline from research to production. After the ``DataPipe`` graph is created and validated with the ``ReadingServices``, a different ``ReadingService`` that configures and connects to the production service/infrastructure such as ``AIStore`` can be provided to :class:`DataLoader2` as a drop-in replacement. The ``ReadingService`` could potentially search the graph, and find ``DataPipe`` operations that can be delegated to the production service/infrastructure, then modify the graph correspondingly to achieve higher-performant execution.

Extend ReadingService
----------------------
Expand Down
5 changes: 3 additions & 2 deletions docs/source/torchdata.datapipes.utils.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ DataPipe Graph Visualization

to_graph

Commond Utility Functions
Common Utility Functions
--------------------------------------
.. currentmodule:: torchdata.datapipes.utils

Expand Down Expand Up @@ -47,7 +47,8 @@ For documentation related to DataLoader, please refer to the
``torch.utils.data`` `documentation <https://pytorch.org/docs/stable/data.html>`_. Or, more specifically, the
`DataLoader API section <https://pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader>`_.

DataLoader v2 is currently in development. You should see an update here by mid-2022.
DataLoader v2 is currently in development. For more information please refer to :doc:`dataloader2`.


Sampler
-------------------------------------
Expand Down
10 changes: 7 additions & 3 deletions test/test_iterdatapipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ def test_slice_iterdatapipe(self):
slice_dp = input_dp.slice(0, 2, 2)
self.assertEqual([(0,), (3,), (6,)], list(slice_dp))

# Functional Test: filter with list of indices for tuple
# Functional Test: slice with list of indices for tuple
slice_dp = input_dp.slice([0, 1])
self.assertEqual([(0, 1), (3, 4), (6, 7)], list(slice_dp))

Expand All @@ -1245,14 +1245,18 @@ def test_slice_iterdatapipe(self):
slice_dp = input_dp.slice(0, 2)
self.assertEqual([[0, 1], [3, 4], [6, 7]], list(slice_dp))

# Functional Test: filter with list of indices for list
# Functional Test: slice with list of indices for list
slice_dp = input_dp.slice(0, 2)
self.assertEqual([[0, 1], [3, 4], [6, 7]], list(slice_dp))

# dict tests
input_dp = IterableWrapper([{"a": 1, "b": 2, "c": 3}, {"a": 3, "b": 4, "c": 5}, {"a": 5, "b": 6, "c": 7}])

# Functional Test: filter with list of indices for dict
# Functional Test: slice with key for dict
slice_dp = input_dp.slice("a")
self.assertEqual([{"a": 1}, {"a": 3}, {"a": 5}], list(slice_dp))

# Functional Test: slice with list of keys for dict
slice_dp = input_dp.slice(["a", "b"])
self.assertEqual([{"a": 1, "b": 2}, {"a": 3, "b": 4}, {"a": 5, "b": 6}], list(slice_dp))

Expand Down
31 changes: 23 additions & 8 deletions torchdata/datapipes/iter/transform/callable.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ def _no_op_fn(*args):
class BatchMapperIterDataPipe(IterDataPipe[T_co]):
r"""
Combines elements from the source DataPipe to batches and applies a function
over each batch, then flattens the outpus to a single, unnested IterDataPipe
over each batch, then flattens the outputs to a single, unnested IterDataPipe
(functional name: ``map_batches``).
Args:
datapipe: Source IterDataPipe
fn: The function to be applied to each batch of data
batch_size: The size of batch to be aggregated from ``datapipe``
input_col: Index or indices of data which ``fn`` is applied, such as:
- ``None`` as default to apply ``fn`` to the data directly.
- Integer(s) is used for list/tuple.
- Key(s) is used for dict.
Expand Down Expand Up @@ -114,6 +115,7 @@ class FlatMapperIterDataPipe(IterDataPipe[T_co]):
datapipe: Source IterDataPipe
fn: the function to be applied to each element in the DataPipe, the output must be a Sequence
input_col: Index or indices of data which ``fn`` is applied, such as:
- ``None`` as default to apply ``fn`` to the data directly.
- Integer(s) is/are used for list/tuple.
- Key(s) is/are used for dict.
Expand All @@ -133,9 +135,9 @@ class FlatMapperIterDataPipe(IterDataPipe[T_co]):
[1, 2, 3, 4, 5, 6]
"""
datapipe: IterDataPipe
fn: Callable
fn: Optional[Callable]

def __init__(self, datapipe: IterDataPipe, fn: Callable = None, input_col=None) -> None:
def __init__(self, datapipe: IterDataPipe, fn: Optional[Callable] = None, input_col=None) -> None:
self.datapipe = datapipe

if fn is None:
Expand All @@ -147,12 +149,12 @@ def __init__(self, datapipe: IterDataPipe, fn: Callable = None, input_col=None)

def _apply_fn(self, data):
if self.input_col is None:
return self.fn(data)
return self.fn(data) # type: ignore[misc]
elif isinstance(self.input_col, (list, tuple)):
args = tuple(data[col] for col in self.input_col)
return self.fn(*args)
return self.fn(*args) # type: ignore[misc]
else:
return self.fn(data[self.input_col])
return self.fn(data[self.input_col]) # type: ignore[misc]

def __iter__(self) -> Iterator[T_co]:
for d in self.datapipe:
Expand All @@ -171,6 +173,9 @@ class DropperIterDataPipe(IterDataPipe[T_co]):
datapipe: IterDataPipe with columns to be dropped
indices: a single column index to be dropped or a list of indices
- Integer(s) is/are used for list/tuple.
- Key(s) is/are used for dict.
Example:
>>> from torchdata.datapipes.iter import IterableWrapper, ZipperMapDataPipe
>>> dp1 = IterableWrapper(range(5))
Expand Down Expand Up @@ -237,8 +242,13 @@ class SliceIterDataPipe(IterDataPipe[T_co]):
Args:
datapipe: IterDataPipe with iterable elements
index: a single start index for the slice or a list of indices to be returned instead of a start/stop slice
stop: the slice stop. ignored if index is a list
step: step to be taken from start to stop. ignored if index is a list
- Integer(s) is/are used for list/tuple.
- Key(s) is/are used for dict.
stop: the slice stop. ignored if index is a list or if element is a dict
step: step to be taken from start to stop. ignored if index is a list or if element is a dict
Example:
>>> from torchdata.datapipes.iter import IterableWrapper
Expand Down Expand Up @@ -285,6 +295,8 @@ def __iter__(self) -> Iterator[T_co]:
elif isinstance(old_item, dict):
if isinstance(self.index, list):
new_item = {k: v for (k, v) in old_item.items() if k in self.index} # type: ignore[assignment]
elif self.index in old_item.keys():
new_item = {self.index: old_item.get(self.index)} # type: ignore[assignment]
else:
new_item = old_item # type: ignore[assignment]
warnings.warn(
Expand Down Expand Up @@ -329,6 +341,9 @@ class FlattenIterDataPipe(IterDataPipe[T_co]):
datapipe: IterDataPipe with iterable elements
indices: a single index/key for the item to flatten from an iterator item or a list of indices/keys to be flattened
- Integer(s) is/are used for list/tuple.
- Key(s) is/are used for dict.
Example:
>>> from torchdata.datapipes.iter import IterableWrapper
>>> dp = IterableWrapper([(0, 10, (100, 1000)), (1, 11, (111, 1001)), (2, 12, (122, 1002)), (3, 13, (133, 1003)), (4, 14, (144, 1004))])
Expand Down
2 changes: 1 addition & 1 deletion torchdata/datapipes/map/util/unzipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class UnZipperMapDataPipe(MapDataPipe):
an integer from 0 to sequence_length - 1)
Example:
>>> from torchdata.datapipes.iter import SequenceWrapper
>>> from torchdata.datapipes.map import SequenceWrapper
>>> source_dp = SequenceWrapper([(i, i + 10, i + 20) for i in range(3)])
>>> dp1, dp2, dp3 = source_dp.unzip(sequence_length=3)
>>> list(dp1)
Expand Down

0 comments on commit 63f9f6c

Please sign in to comment.