diff --git a/docs/source/torchdata.datapipes.iter.rst b/docs/source/torchdata.datapipes.iter.rst index ca73dc136..578a29b42 100644 --- a/docs/source/torchdata.datapipes.iter.rst +++ b/docs/source/torchdata.datapipes.iter.rst @@ -181,6 +181,7 @@ A miscellaneous set of DataPipes with different functionalities. LengthSetter MapToIterConverter OnDiskCacheHolder + Prefetcher RandomSplitter ShardingFilter diff --git a/torchdata/datapipes/iter/util/prefetcher.py b/torchdata/datapipes/iter/util/prefetcher.py index d3b1003e9..8f8f4db0f 100644 --- a/torchdata/datapipes/iter/util/prefetcher.py +++ b/torchdata/datapipes/iter/util/prefetcher.py @@ -29,6 +29,28 @@ def __init__(self, source_datapipe, buffer_size): @functional_datapipe("prefetch") class PrefetcherIterDataPipe(IterDataPipe): + """ + Prefetches elements from the source DataPipe and puts them into a buffer (functional name: ``prefetch``). + Prefetching performs the operations (e.g. I/O, computations) of the DataPipes up to this one ahead of time + and stores the result in the buffer, ready to be consume by the subsequent DataPipe. It has no effect aside + from getting the sample ready ahead of time. + + This is used by ``PrototypeMultiProcessingReadingService`` when the arguments + ``prefetch_worker`` (for prefetching at each worker process) or + ``prefetch_mainloop`` (for prefetching at the moain loop) are greater than 0. + + Beyond the built-in use cases, this can be useful to put after I/O DataPipes that have + expensive I/O operations (e.g. takes a long time to request a file from a remote server). + + Args: + source_datapipe: IterDataPipe from which samples are prefetched + buffer_size: the size of the buffer which stores the prefetched samples + + Example: + >>> from torchdata.datapipes.iter import IterableWrapper + >>> dp = IterableWrapper(file_paths).open_files().prefetch(5) + """ + def __init__(self, source_datapipe, buffer_size: int = 10): self.source_datapipe = source_datapipe if buffer_size <= 0: