Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for streaming uploads #8

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion pycurl_requests/adapters/pycurl.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ def send(self, request, stream=False, timeout=None, verify=True, cert=None, prox
return pycurl_request.send()


class ChunkIterableReader:
def __init__(self, iterator: Iterator[bytes]):
self._iterator = iterator

def read(self, ignored) -> bytes:
return bytes(next(self._iterator, b""))

def close(self): # TODO
try:
self._iterator.close()
except AttributeError:
pass
Comment on lines +93 to +97
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should be closing the iterator from within the library. Typically I would expect a user to be using a with block or calling close manually.




class PyCurlRequest:
def __init__(self, prepared, *, curl=None, timeout=None, allow_redirects=True, max_redirects=-1):
self.prepared = prepared
Expand Down Expand Up @@ -167,13 +182,31 @@ def send(self):
if self.prepared.body is not None:
if isinstance(self.prepared.body, str):
body = io.BytesIO(self.prepared.body.encode('iso-8859-1'))
self.curl.setopt(pycurl.READDATA, body)
elif isinstance(self.prepared.body, bytes):
body = io.BytesIO(self.prepared.body)
self.curl.setopt(pycurl.READDATA, body)
elif isinstance(self.prepared.body, (io.RawIOBase, io.BufferedIOBase)):
self.curl.setopt(pycurl.READFUNCTION, self.prepared.body.read)
self.curl.setopt(pycurl.TRANSFER_ENCODING, 1)
elif hasattr(self.prepared.body, "__iter__"): # TODO: call iter instead of checking (e.g. to support delegates)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to use isinstance(obj, Collection).

collections.abc.Collection guarantees that a type implements __contains__, __iter__ and __len__. You can also use collections.abc.Iterable to specifically test for __iter__.

try:
n_bytes = len(self.prepared.body)
except TypeError:
# "(Since 7.66.0, libcurl will automatically use chunked encoding for POSTs if the size is unknown.)"
self.curl.setopt(pycurl.TRANSFER_ENCODING, 1)
else:
self.curl.setopt(pycurl.TRANSFER_ENCODING, 0)
self.curl.setopt(pycurl.INFILESIZE_LARGE, n_bytes)
reader = ChunkIterableReader(iter(self.prepared.body))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can use the two argument form of iter here:

iter(self.prepared.body, "") # for string
iter(self.prepared.body, b"") # for bytes

self.curl.setopt(pycurl.READFUNCTION, reader.read)
# TODO: throw exceptions to the iterator (requests doesn't do this but would facilitate error handling)
else:
body = self.prepared.body
self.curl.setopt(pycurl.READDATA, body)

self.curl.setopt(pycurl.UPLOAD, 1)
self.curl.setopt(pycurl.READDATA, body)


content_length = self.prepared.headers.get('Content-Length')
if content_length is not None:
Expand Down
38 changes: 38 additions & 0 deletions pycurl_requests/tests/test_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import io
from pycurl_requests import requests
from pycurl_requests.tests.utils import *


def test_streaming_upload_from_file(http_server):
f = io.BytesIO(test_data)
response = requests.post(http_server.base_url + '/stream', data=f)
assert response.status_code == 200


def data_generator(data: bytes, chunk_size: int):
i = 0
while True:
chunk = data[chunk_size * i: chunk_size * (i + 1)]
if len(chunk) == 0:
break
yield chunk
i += 1


def test_streaming_upload_form_iterable(http_server):
response = requests.post(http_server.base_url + '/stream', data=data_generator(test_data, 123))
assert response.status_code == 200


def test_streaming_upload_form_iterable_with_known_length(http_server):
class FixedLengthIterable:
data = test_data

def __len__(self):
return len(self.data)

def __iter__(self):
return data_generator(data=self.data, chunk_size=123)

response = requests.post(http_server.base_url + '/stream_no_chunked', data=FixedLengthIterable())
assert response.status_code == 200
44 changes: 43 additions & 1 deletion pycurl_requests/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import json
import random
import threading
import time
from http import cookies
Expand All @@ -13,13 +14,16 @@

from pycurl_requests import requests

__all__ = ['IS_PYCURL_REQUESTS', 'http_server']
__all__ = ['IS_PYCURL_REQUESTS', 'http_server', 'test_data']

#: Is this _really_ PyCurl-Requests?
#: Should be used when testing for PyCurl-Requests extensions.
IS_PYCURL_REQUESTS = requests.__name__ == 'pycurl_requests'


test_data = bytes(random.getrandbits(8) for _ in range(123456))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend using os.urandom here.



@pytest.fixture(scope='module')
def http_server():
httpd = HTTPServer(('127.0.0.1', 0), HTTPRequestHandler)
Expand Down Expand Up @@ -93,6 +97,44 @@ def do_GET_response_headers(self):
def do_HTTP_404(self):
self.send_error(404, 'Not Found')

def do_POST(self):
path = self.url.path[1:].replace('/', '_')
getattr(self, f'do_POST_{path}', self.do_HTTP_404)()

def do_POST_stream(self):
self.POST_stream_helper(allow_chunked=True)

def do_POST_stream_no_chunked(self):
self.POST_stream_helper(allow_chunked=False)

def POST_stream_helper(self, allow_chunked: bool):
if "Content-Length" in self.headers:
content_length = int(self.headers["Content-Length"])
body = self.rfile.read(content_length)
elif "Transfer-Encoding" in self.headers and "chunked" in self.headers["Transfer-Encoding"]:
if not allow_chunked:
self.response('This endpoint has chunked transfer deactivated.', status=(400, "Bad Request"))
return
body = b""
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Joining immutable bytes objects can be potentially expensive. bytearray is a better choice if you need something mutable.

while True:
line = self.rfile.readline()
chunk_length = int(line, 16)
if chunk_length != 0:
chunk = self.rfile.read(chunk_length)
body += chunk
self.rfile.readline()
if chunk_length == 0:
break
else:
self.response('Missing Content-Length or Transfer-Encoding header.', status=(400, "Bad Request"))
return

if body == test_data:
self.response('Upload succeeded.')
else:
self.response('Upload failed.', status=(400, "Bad Request"))


@property
def url(self):
if not hasattr(self, '_url'):
Expand Down