Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the functionality to read chunk by chunk #439

Open
NivekT opened this issue May 23, 2022 · 4 comments
Open

Add the functionality to read chunk by chunk #439

NivekT opened this issue May 23, 2022 · 4 comments

Comments

@NivekT
Copy link
Contributor

NivekT commented May 23, 2022

🚀 The feature

This issue is a continuation of the discussion from here.

We are proposing to add some functionality to allow reading chunk by chunk from a stream. Ideally, this should be done without loading the entire stream/response into memory first, but this may not be possible in all cases.

Motivation, pitch

From @pmeier

Having something like a ChunkReader datapipe can be useful in general not just for HTTP requests, right?

from torchdata.datapipes.iter import IterDataPipe


class ChunkReader(IterDataPipe):
    def __init__(self, datapipe, *, chunk_size=32 * 1024 * 1024):
        self.datapipe = datapipe
        self.chunk_size = chunk_size

    def __iter__(self):
        for path, stream in self.datapipe:
            for chunk in iter(lambda: stream.read(self.chunk_size), b""):
                # filter out keep-alive new chunks from HTTP streams
                if not chunk:
                    continue

                yield path, chunk

A utility like that is needed anyway for Saver anyway. AFAIK, there is currently no builtin functionality to read data from a stream, correct? I'm guessing that is why the Saver example uses bytes as input.

This can be useful for reading from file stream in chunks (e.g. after FileOpener), writing to files in chunks (e.g. Saver, and etc (e.g. HttpReader).

Alternatives

Instead of adding a new DataPipe, we can consider modifying specific existing DataPipes to add the functionality of reading by chunk. This may be useful in avoiding the need to read the entire stream/response into memory (as I believe it is the case in the current implementation of HttpReader, please correct me if this is wrong).

We can also do both.

Additional context

No response

cc: @ejguan @VitalyFedyunin

@ejguan
Copy link
Contributor

ejguan commented May 23, 2022

We do have such DataPipe
https://github.com/pytorch/pytorch/blob/cac16e2ee293964033dffa6616f78b68603cd565/torch/utils/data/datapipes/iter/streamreader.py#L8-L9

@ejguan
Copy link
Contributor

ejguan commented May 23, 2022

I am okay with name changes.

@pmeier
Copy link
Contributor

pmeier commented May 24, 2022

We do have such DataPipe

Oops, I was not aware of that. The implementation looks like

while True:
    d = stream.read(self.chunk)
    if not d:
        break
    yield (furl, d)

Compared to the proposal in #439 (comment) this almost equivalent. The only difference is that StreamReader stops reading after the first empty chunk. HTTP responses sometimes include keep alive chunks, i.e. empty chunks, in the middle of the response. Stopping to read after the first possibly discards everything that comes afterwards.

Are you ok with me sending a PR to adopt StreamReader to also handle keep alive chunks?

@NivekT
Copy link
Contributor Author

NivekT commented May 24, 2022

Compared to the proposal in #439 (comment) this almost equivalent. The only difference is that StreamReader stops reading after the first empty chunk. HTTP responses sometimes include keep alive chunks, i.e. empty chunks, in the middle of the response. Stopping to read after the first possibly discards everything that comes afterwards.

Are you ok with me sending a PR to adopt StreamReader to also handle keep alive chunks?

That seems reasonable to me. We can have an argument break_if_empty=True, which can be set to False to handle keep alive chunks:

def __iter__(self):
    for furl, stream in self.datapipe:
        while True:
            data = stream.read(self.chunk_size)
            if not data:
                if self.break_if_empty:
                    break
                else:
                    continue
            yield furl, data

However, I think in that case, we will need some mechanism to terminate the iteration when self.break_if_empty=False?

In your code snippet, I think the iterator would end as soon as b"" is given. What is the typical value of a "keep alive" chunk? If it is an empty string, then wouldn't the code below just terminates?

iter(lambda: stream.read(self.chunk_size), b"")

We can continue our discussion here:
pytorch/pytorch#78194

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants