diff --git a/docs/source/dp_tutorial.rst b/docs/source/dp_tutorial.rst index 8e3cf3579..7ec7d897a 100644 --- a/docs/source/dp_tutorial.rst +++ b/docs/source/dp_tutorial.rst @@ -72,7 +72,7 @@ into the ``DataLoader``. For detailed documentation related to ``DataLoader``, please visit `this PyTorch Core page `_. -Please refer to `this page `_ about using ``DataPipe`` with ``DataLoader2``. +Please refer to :doc:`this page ` about using ``DataPipe`` with ``DataLoader2``. For this example, we will first have a helper function that generates some CSV files with random label and data. diff --git a/docs/source/examples.rst b/docs/source/examples.rst index fdd96affc..ed5535551 100644 --- a/docs/source/examples.rst +++ b/docs/source/examples.rst @@ -75,6 +75,13 @@ semantic classes. Here is a `_ created by our community. +laion2B-en-joined +^^^^^^^^^^^^^^^^^^^^^^ +The `laion2B-en-joined dataset `_ is a subset of the `LAION-5B dataset `_ containing english captions, URls pointing to images, +and other metadata. It contains around 2.32 billion entries. +Currently (February 2023) around 86% of the URLs still point to valid images. Here is a `DataPipe implementation of laion2B-en-joined +`_ that filters out unsafe images and images with watermarks and loads the images from the URLs. + Additional Datasets in TorchVision ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ In a separate PyTorch domain library `TorchVision `_, you will find some of the most diff --git a/docs/source/reading_service.rst b/docs/source/reading_service.rst index f16100a28..e5836a407 100644 --- a/docs/source/reading_service.rst +++ b/docs/source/reading_service.rst @@ -13,11 +13,11 @@ Features Dynamic Sharding ^^^^^^^^^^^^^^^^ -Dynamic sharding is achieved by ``MultiProcessingReadingService`` and ``DistributedReadingService`` to shard the pipeline based on the information of corresponding multiprocessing and distributed workers. And, TorchData offers two types of ``DataPipe`` letting users to define the sharding place within the pipeline. +Dynamic sharding is achieved by ``MultiProcessingReadingService`` and ``DistributedReadingService`` to shard the pipeline based on the information of corresponding multiprocessing and distributed workers. And, TorchData offers two types of ``DataPipe`` letting users define the sharding place within the pipeline. - ``sharding_filter`` (:class:`ShardingFilter`): When the pipeline is replicable, each distributed/multiprocessing worker loads data from its own replica of the ``DataPipe`` graph, while skipping samples that do not belong to the corresponding worker at the point where ``sharding_filter`` is placed. -- ``sharding_round_robin_dispatch`` (:class:`ShardingRoundRobinDispatcher`): When there is any ``sharding_round_robin_dispatch`` ``DataPipe`` in the pipeline, that branch (i.e. all DataPipes prior to ``sharding_round_robin_dispatch``) will be treated as a non-replicable branch (in the context of multiprocessing). A single dispatching process will be created to load data from the non-replicable branch and distributed data to the subsequent worker processes. +- ``sharding_round_robin_dispatch`` (:class:`ShardingRoundRobinDispatcher`): When there is any ``sharding_round_robin_dispatch`` ``DataPipe`` in the pipeline, that branch (i.e. all DataPipes prior to ``sharding_round_robin_dispatch``) will be treated as a non-replicable branch (in the context of multiprocessing). A single dispatching process will be created to load data from the non-replicable branch and distribute data to the subsequent worker processes. The following is an example of having two types of sharding strategies in the pipeline. @@ -116,21 +116,21 @@ When multiprocessing takes place, the graph becomes: end [shape=box]; } -``Client`` in the graph is a ``DataPipe`` that send request and receive response from multiprocessing queues. +``Client`` in the graph is a ``DataPipe`` that sends a request and receives a response from multiprocessing queues. .. module:: torchdata.dataloader2 Determinism ^^^^^^^^^^^^ -In ``DataLoader2``, a ``SeedGenerator`` becomes a single source of randomness and each ``ReadingService`` would access to it via ``initialize_iteration()`` and generate corresponding random seeds for random ``DataPipe`` operations. +In ``DataLoader2``, a ``SeedGenerator`` becomes a single source of randomness and each ``ReadingService`` would access it via ``initialize_iteration()`` and generate corresponding random seeds for random ``DataPipe`` operations. In order to make sure that the Dataset shards are mutually exclusive and collectively exhaustive on multiprocessing processes and distributed nodes, ``MultiProcessingReadingService`` and ``DistributedReadingService`` would help :class:`DataLoader2` to synchronize random states for any random ``DataPipe`` operation prior to ``sharding_filter`` or ``sharding_round_robin_dispatch``. For the remaining ``DataPipe`` operations after sharding, unique random states are generated based on the distributed rank and worker process id by each ``ReadingService``, in order to perform different random transformations. Graph Mode ^^^^^^^^^^^ -This also allows easier transition of data-preprocessing pipeline from research to production. After the ``DataPipe`` graph is created and validated with the ``ReadingServices``, a different ``ReadingService`` that configures and connects to the production service/infra such as ``AIStore`` can be provided to :class:`DataLoader2` as a drop-in replacement. The ``ReadingService`` could potentially search the graph, and find ``DataPipe`` operations that can be delegated to the production service/infra, then modify the graph correspondingly to achieve higher-performant execution. +This also allows easier transition of data-preprocessing pipeline from research to production. After the ``DataPipe`` graph is created and validated with the ``ReadingServices``, a different ``ReadingService`` that configures and connects to the production service/infrastructure such as ``AIStore`` can be provided to :class:`DataLoader2` as a drop-in replacement. The ``ReadingService`` could potentially search the graph, and find ``DataPipe`` operations that can be delegated to the production service/infrastructure, then modify the graph correspondingly to achieve higher-performant execution. Extend ReadingService ---------------------- diff --git a/docs/source/torchdata.datapipes.utils.rst b/docs/source/torchdata.datapipes.utils.rst index ad96e2430..025b0a474 100644 --- a/docs/source/torchdata.datapipes.utils.rst +++ b/docs/source/torchdata.datapipes.utils.rst @@ -15,7 +15,7 @@ DataPipe Graph Visualization to_graph -Commond Utility Functions +Common Utility Functions -------------------------------------- .. currentmodule:: torchdata.datapipes.utils @@ -47,7 +47,8 @@ For documentation related to DataLoader, please refer to the ``torch.utils.data`` `documentation `_. Or, more specifically, the `DataLoader API section `_. -DataLoader v2 is currently in development. You should see an update here by mid-2022. +DataLoader v2 is currently in development. For more information please refer to :doc:`dataloader2`. + Sampler ------------------------------------- diff --git a/test/test_iterdatapipe.py b/test/test_iterdatapipe.py index 64f539f26..e9c088de2 100644 --- a/test/test_iterdatapipe.py +++ b/test/test_iterdatapipe.py @@ -1230,7 +1230,7 @@ def test_slice_iterdatapipe(self): slice_dp = input_dp.slice(0, 2, 2) self.assertEqual([(0,), (3,), (6,)], list(slice_dp)) - # Functional Test: filter with list of indices for tuple + # Functional Test: slice with list of indices for tuple slice_dp = input_dp.slice([0, 1]) self.assertEqual([(0, 1), (3, 4), (6, 7)], list(slice_dp)) @@ -1245,14 +1245,18 @@ def test_slice_iterdatapipe(self): slice_dp = input_dp.slice(0, 2) self.assertEqual([[0, 1], [3, 4], [6, 7]], list(slice_dp)) - # Functional Test: filter with list of indices for list + # Functional Test: slice with list of indices for list slice_dp = input_dp.slice(0, 2) self.assertEqual([[0, 1], [3, 4], [6, 7]], list(slice_dp)) # dict tests input_dp = IterableWrapper([{"a": 1, "b": 2, "c": 3}, {"a": 3, "b": 4, "c": 5}, {"a": 5, "b": 6, "c": 7}]) - # Functional Test: filter with list of indices for dict + # Functional Test: slice with key for dict + slice_dp = input_dp.slice("a") + self.assertEqual([{"a": 1}, {"a": 3}, {"a": 5}], list(slice_dp)) + + # Functional Test: slice with list of keys for dict slice_dp = input_dp.slice(["a", "b"]) self.assertEqual([{"a": 1, "b": 2}, {"a": 3, "b": 4}, {"a": 5, "b": 6}], list(slice_dp)) diff --git a/torchdata/datapipes/iter/transform/callable.py b/torchdata/datapipes/iter/transform/callable.py index 5ab90bada..2923db1a2 100644 --- a/torchdata/datapipes/iter/transform/callable.py +++ b/torchdata/datapipes/iter/transform/callable.py @@ -26,7 +26,7 @@ def _no_op_fn(*args): class BatchMapperIterDataPipe(IterDataPipe[T_co]): r""" Combines elements from the source DataPipe to batches and applies a function - over each batch, then flattens the outpus to a single, unnested IterDataPipe + over each batch, then flattens the outputs to a single, unnested IterDataPipe (functional name: ``map_batches``). Args: @@ -34,6 +34,7 @@ class BatchMapperIterDataPipe(IterDataPipe[T_co]): fn: The function to be applied to each batch of data batch_size: The size of batch to be aggregated from ``datapipe`` input_col: Index or indices of data which ``fn`` is applied, such as: + - ``None`` as default to apply ``fn`` to the data directly. - Integer(s) is used for list/tuple. - Key(s) is used for dict. @@ -114,6 +115,7 @@ class FlatMapperIterDataPipe(IterDataPipe[T_co]): datapipe: Source IterDataPipe fn: the function to be applied to each element in the DataPipe, the output must be a Sequence input_col: Index or indices of data which ``fn`` is applied, such as: + - ``None`` as default to apply ``fn`` to the data directly. - Integer(s) is/are used for list/tuple. - Key(s) is/are used for dict. @@ -133,9 +135,9 @@ class FlatMapperIterDataPipe(IterDataPipe[T_co]): [1, 2, 3, 4, 5, 6] """ datapipe: IterDataPipe - fn: Callable + fn: Optional[Callable] - def __init__(self, datapipe: IterDataPipe, fn: Callable = None, input_col=None) -> None: + def __init__(self, datapipe: IterDataPipe, fn: Optional[Callable] = None, input_col=None) -> None: self.datapipe = datapipe if fn is None: @@ -147,12 +149,12 @@ def __init__(self, datapipe: IterDataPipe, fn: Callable = None, input_col=None) def _apply_fn(self, data): if self.input_col is None: - return self.fn(data) + return self.fn(data) # type: ignore[misc] elif isinstance(self.input_col, (list, tuple)): args = tuple(data[col] for col in self.input_col) - return self.fn(*args) + return self.fn(*args) # type: ignore[misc] else: - return self.fn(data[self.input_col]) + return self.fn(data[self.input_col]) # type: ignore[misc] def __iter__(self) -> Iterator[T_co]: for d in self.datapipe: @@ -171,6 +173,9 @@ class DropperIterDataPipe(IterDataPipe[T_co]): datapipe: IterDataPipe with columns to be dropped indices: a single column index to be dropped or a list of indices + - Integer(s) is/are used for list/tuple. + - Key(s) is/are used for dict. + Example: >>> from torchdata.datapipes.iter import IterableWrapper, ZipperMapDataPipe >>> dp1 = IterableWrapper(range(5)) @@ -237,8 +242,13 @@ class SliceIterDataPipe(IterDataPipe[T_co]): Args: datapipe: IterDataPipe with iterable elements index: a single start index for the slice or a list of indices to be returned instead of a start/stop slice - stop: the slice stop. ignored if index is a list - step: step to be taken from start to stop. ignored if index is a list + + - Integer(s) is/are used for list/tuple. + - Key(s) is/are used for dict. + + + stop: the slice stop. ignored if index is a list or if element is a dict + step: step to be taken from start to stop. ignored if index is a list or if element is a dict Example: >>> from torchdata.datapipes.iter import IterableWrapper @@ -285,6 +295,8 @@ def __iter__(self) -> Iterator[T_co]: elif isinstance(old_item, dict): if isinstance(self.index, list): new_item = {k: v for (k, v) in old_item.items() if k in self.index} # type: ignore[assignment] + elif self.index in old_item.keys(): + new_item = {self.index: old_item.get(self.index)} # type: ignore[assignment] else: new_item = old_item # type: ignore[assignment] warnings.warn( @@ -329,6 +341,9 @@ class FlattenIterDataPipe(IterDataPipe[T_co]): datapipe: IterDataPipe with iterable elements indices: a single index/key for the item to flatten from an iterator item or a list of indices/keys to be flattened + - Integer(s) is/are used for list/tuple. + - Key(s) is/are used for dict. + Example: >>> from torchdata.datapipes.iter import IterableWrapper >>> dp = IterableWrapper([(0, 10, (100, 1000)), (1, 11, (111, 1001)), (2, 12, (122, 1002)), (3, 13, (133, 1003)), (4, 14, (144, 1004))]) diff --git a/torchdata/datapipes/map/util/unzipper.py b/torchdata/datapipes/map/util/unzipper.py index 4fef763a4..99c011ae1 100644 --- a/torchdata/datapipes/map/util/unzipper.py +++ b/torchdata/datapipes/map/util/unzipper.py @@ -31,7 +31,7 @@ class UnZipperMapDataPipe(MapDataPipe): an integer from 0 to sequence_length - 1) Example: - >>> from torchdata.datapipes.iter import SequenceWrapper + >>> from torchdata.datapipes.map import SequenceWrapper >>> source_dp = SequenceWrapper([(i, i + 10, i + 20) for i in range(3)]) >>> dp1, dp2, dp3 = source_dp.unzip(sequence_length=3) >>> list(dp1)