Skip to content

Commit

Permalink
Fixing tutorial with DataLoader
Browse files Browse the repository at this point in the history
Updating the tutorial and README with more relevant/correct information. Minor fix to one part of `MapDataPipe` documentation as well.

Fixes #352
  • Loading branch information
NivekT committed May 25, 2022
1 parent 4b5e1da commit e560ef7
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 17 deletions.
21 changes: 17 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,30 @@ Q: What should I do if the existing set of DataPipes does not do what I need?
A: You can
[implement your own custom DataPipe](https://pytorch.org/data/main/tutorial.html#implementing-a-custom-datapipe). If you
believe your use case is common enough such that the community can benefit from having your custom DataPipe added to
this library, feel free to open a GitHub issue.
this library, feel free to open a GitHub issue. We will be happy to discuss!

Q: What happens when the `Shuffler`/`Batcher` DataPipes are used with DataLoader?
Q: What happens when the `Shuffler` DataPipe is used with DataLoader?

A: If you choose those DataPipes while setting `shuffle=True`/`batch_size>1` for DataLoader, your samples will be
shuffled/batched more than once. You should choose one or the other.
A. In order to enable shuffling, you need to add a `Shuffler` to your DataPipe line. Then, by default, shuffling will
happen at the point where you specified as long as you do not set `shuffle=False` within DataLoader.

Q: What happens when the `Batcher` DataPipe is used with DataLoader?

A: If you choose to use `Batcher` while setting `batch_size > 1` for DataLoader, your samples will be batched more than
once. You should choose one or the other.

Q: Why are there fewer built-in `MapDataPipes` than `IterDataPipes`?

A: By design, there are fewer `MapDataPipes` than `IterDataPipes` to avoid duplicate implementations of the same
functionalities as `MapDataPipe`. We encourage users to use the built-in `IterDataPipe` for various functionalities, and
convert it to `MapDataPipe` as needed.

Q: How is multiprocessing handled with DataPipes?

A: Multi-process data loading is still handled by DataLoader, see the
[DataLoader documentation for more details](https://pytorch.org/docs/stable/data.html#single-and-multi-process-data-loading).
If you would like to shard data across processes, use `ShardingFilter` and provide a `worker_init_fn` as shown in the
[tutorial](https://pytorch.org/data/beta/tutorial.html#working-with-dataloader).

Q: What is the upcoming plan for DataLoader?

Expand Down
6 changes: 3 additions & 3 deletions docs/source/torchdata.datapipes.map.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ corresponding label from a folder on the disk.

By design, there are fewer ``MapDataPipe`` than ``IterDataPipe`` to avoid duplicate implementations of the same
functionalities as ``MapDataPipe``. We encourage users to use the built-in ``IterDataPipe`` for various functionalities,
and convert it to ``MapDataPipe`` as needed using ``MapToIterConverter`` or ``.to_iter_datapipe()``.
If you have any question about usage or best practices while using `MapDataPipe`, feel free to ask on the PyTorch forum
under the `'data' category <https://discuss.pytorch.org/c/data/37>`_.
and convert it to ``MapDataPipe`` as needed using :class:`.IterToMapConverter` or ``.to_map_datapipe()``.
If you have any question about usage or best practices while using ``MapDataPipe``, feel free to ask on the PyTorch
forum under the `'data' category <https://discuss.pytorch.org/c/data/37>`_.

We are open to add additional ``MapDataPipe`` where the operations can be lazily executed and ``__len__`` can be
known in advance. Feel free to make suggestions with description of your use case in
Expand Down
77 changes: 67 additions & 10 deletions docs/source/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,33 @@ For this example, we will first have a helper function that generates some CSV f
row_data['label'] = random.randint(0, 9)
writer.writerow(row_data)
Next, we will build our DataPipes to read and parse through the generated CSV files:
Next, we will build our DataPipes to read and parse through the generated CSV files. Note that we prefer to have
pass defined functions to DataPipes rather than lambda functions because the formers are serializable with `pickle`.

.. code:: python
import numpy as np
import torchdata.datapipes as dp
def filter_for_data(filename):
return "sample_data" in filename and filename.endswith(".csv")
def row_processer(row):
return {"label": np.array(row[0], np.int32), "data": np.array(row[1:], dtype=np.float64)}
def build_datapipes(root_dir="."):
datapipe = dp.iter.FileLister(root_dir)
datapipe = datapipe.filter(filter_fn=lambda filename: "sample_data" in filename and filename.endswith(".csv"))
datapipe = dp.iter.FileOpener(datapipe, mode='rt')
datapipe = datapipe.filter(filter_fn=filter_for_data)
datapipe = datapipe.open_files(mode='rt')
datapipe = datapipe.parse_csv(delimiter=",", skip_lines=1)
datapipe = datapipe.map(lambda row: {"label": np.array(row[0], np.int32),
"data": np.array(row[1:], dtype=np.float64)})
# Shuffle will happen as long as you do NOT set `shuffle=False` later in the DataLoader
datapipe = datapipe.shuffle()
datapipe = datapipe.map(row_processer)
return datapipe
Lastly, we will put everything together in ``'__main__'`` and pass the DataPipe into the DataLoader.
Lastly, we will put everything together in ``'__main__'`` and pass the DataPipe into the DataLoader. Note that
if you choose to use `Batcher` while setting `batch_size > 1` for DataLoader, your samples will be
batched more than once. You should choose one or the other.

.. code:: python
Expand All @@ -105,20 +115,67 @@ Lastly, we will put everything together in ``'__main__'`` and pass the DataPipe
if __name__ == '__main__':
num_files_to_generate = 3
for i in range(num_files_to_generate):
generate_csv(file_label=i)
generate_csv(file_label=i, num_rows=10, num_features=3)
datapipe = build_datapipes()
dl = DataLoader(dataset=datapipe, batch_size=50, shuffle=True)
dl = DataLoader(dataset=datapipe, batch_size=5, num_workers=2)
first = next(iter(dl))
labels, features = first['label'], first['data']
print(f"Labels batch shape: {labels.size()}")
print(f"Feature batch shape: {features.size()}")
print(f"{labels = }\n{features = }")
n_sample = 0
for row in iter(dl):
n_sample += 1
print(f"{n_sample = }")
The following statements will be printed to show the shapes of a single batch of labels and features.

.. code::
Labels batch shape: 50
Feature batch shape: torch.Size([50, 20])
Labels batch shape: torch.Size([5])
Feature batch shape: torch.Size([5, 3])
labels = tensor([8, 9, 5, 9, 7], dtype=torch.int32)
features = tensor([[0.2867, 0.5973, 0.0730],
[0.7890, 0.9279, 0.7392],
[0.8930, 0.7434, 0.0780],
[0.8225, 0.4047, 0.0800],
[0.1655, 0.0323, 0.5561]], dtype=torch.float64)
n_sample = 12
The reason why ``n_sample = 12`` is because ``ShardingFilter`` (``datapipe.sharding_filter()``) was not used, such that
each worker will independently return all samples. In this case, there are 10 rows per file and 3 files, with a
batch size of 5, that gives us 6 batches per worker. With 2 workers, we get 12 total batches from the ``DataLoader``.

In order for DataPipe sharding to work with ``DataLoader``, we need to add the following. It is crucial to add
`ShardingFilter` after `Shuffler` to ensure that all worker processes have the same order of data for sharding.

.. code:: python
def build_datapipes(root_dir="."):
datapipe = ...
# Add the following line to `build_datapipes`
# Note that it is somewhere after `Shuffler` in the DataPipe line
datapipe = datapipe.sharding_filter()
return datapipe
def worker_init_fn(worker_id):
info = torch.utils.data.get_worker_info()
num_workers = info.num_workers
datapipe = info.dataset
torch.utils.data.graph_settings.apply_sharding(datapipe, num_workers, worker_id)
# Pass `worker_init_fn` into `DataLoader` within '__main__'
...
dl = DataLoader(dataset=datapipe, shuffle=True, batch_size=5, num_workers=2, worker_init_fn=worker_init_fn)
...
When we re-run, we will get:

.. code::
...
n_sample = 6
You can find more DataPipe implementation examples for various research domains `on this page <torchexamples.html>`_.

Expand Down

0 comments on commit e560ef7

Please sign in to comment.