You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
classStream(object):
"""Stream."""def__init__(self, response: Response, chunksize: int=65536) ->None:
"""Initialize Stream. Args: response: Streamed response. chunksize: Chunk size (used when there is no Content-Length header). """self._response=responseself._chunksize=chunksizedef__bool__(self) ->bool:
"""Stream as boolean. Needed for Session.request() which uses: data=data or dict(). (otherwise, would be considered False when length is 0). Returns: Always True. """returnTruedef__len__(self) ->int:
"""Get stream length. Returns: The stream length. Zero if there is no Content-Length header. """returnint(self._response.headers.get('Content-Length', '0'))
def__iter__(self) ->Iterator[bytes]:
"""Get an iterator of chunks of body. Returns: A bytes iterator. """returnself._response.raw.stream(self._chunksize) # type: ignoredefread(self, size: Optional[int] =-1) ->AnyStr:
"""Read stream. Args: size: Length to read. Returns: The read bytes/str. """returnself._response.raw.read(size) # type: ignore
I want to do this:
When the source has a content-length, this is easily done with:
But when there is no Content-Length, it's really harder ... (we need to implement bool, len, iter, ...).
Please provide an easy to use Streamer.
See https://gitlab.com/gitlabracadabra/gitlabracadabra/-/issues/37
The text was updated successfully, but these errors were encountered: