Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core decompress body #18581

Merged
merged 23 commits into from
May 13, 2021
  •  
  •  
  •  
65 changes: 61 additions & 4 deletions sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@
# --------------------------------------------------------------------------
from typing import Any, Optional, AsyncIterator as AsyncIteratorType
from collections.abc import AsyncIterator
try:
import cchardet as chardet
except ImportError: # pragma: no cover
import chardet # type: ignore

import logging
import asyncio
import codecs
import aiohttp
from multidict import CIMultiDict
from requests.exceptions import StreamConsumedError
Expand Down Expand Up @@ -145,6 +150,11 @@ async def send(self, request: HttpRequest, **config: Any) -> Optional[AsyncHttpR
:keyword str proxy: will define the proxy to use all the time
"""
await self.open()
try:
auto_decompress = self.session.auto_decompress
xiangyan99 marked this conversation as resolved.
Show resolved Hide resolved
except AttributeError:
# if this is a custom session and there is no auto_decompress attribute, we assume no need to decompress
auto_decompress = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a comment as to why we need this. I know I would be confused unless I knew the history...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.


proxies = config.pop('proxies', None)
if proxies and 'proxy' not in config:
Expand Down Expand Up @@ -180,7 +190,9 @@ async def send(self, request: HttpRequest, **config: Any) -> Optional[AsyncHttpR
allow_redirects=False,
**config
)
response = AioHttpTransportResponse(request, result, self.connection_config.data_block_size)
response = AioHttpTransportResponse(request, result,
self.connection_config.data_block_size,
decompress=not auto_decompress)
if not stream_response:
await response.load_body()
except aiohttp.client_exceptions.ClientResponseError as err:
Expand Down Expand Up @@ -250,21 +262,40 @@ class AioHttpTransportResponse(AsyncHttpResponse):
:type aiohttp_response: aiohttp.ClientResponse object
:param block_size: block size of data sent over connection.
:type block_size: int
:keyword bool decompress: If True which is default, will attempt to decode the body based
on the ‘content-encoding’ header.
"""
def __init__(self, request: HttpRequest, aiohttp_response: aiohttp.ClientResponse, block_size=None) -> None:
def __init__(self, request: HttpRequest,
aiohttp_response: aiohttp.ClientResponse,
block_size=None, **kwargs) -> None:
super(AioHttpTransportResponse, self).__init__(request, aiohttp_response, block_size=block_size)
# https://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientResponse
self.status_code = aiohttp_response.status
self.headers = CIMultiDict(aiohttp_response.headers)
self.reason = aiohttp_response.reason
self.content_type = aiohttp_response.headers.get('content-type')
self._body = None
self._decompress = kwargs.pop("decompress", True)
xiangyan99 marked this conversation as resolved.
Show resolved Hide resolved
if len(kwargs) > 0:
raise TypeError("Got an unexpected keyword argument: {}".format(list(kwargs.keys())[0]))

def body(self) -> bytes:
"""Return the whole body as bytes in memory.
"""
if self._body is None:
raise ValueError("Body is not available. Call async method load_body, or do your call with stream=False.")
if not self._decompress:
return self._body
enc = self.headers.get('Content-Encoding')
if not enc:
return self._body
enc = enc.lower()
if enc in ("gzip", "deflate"):
import zlib
zlib_mode = 16 + zlib.MAX_WBITS if enc == "gzip" else zlib.MAX_WBITS
decompressor = zlib.decompressobj(wbits=zlib_mode)
body = decompressor.decompress(self._body)
return body
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have some concerns about us not caching the decompressed body. Because we only need it once, right? Do we have any other access to self._body that requires us to keep the compressed data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't expect (as least I did not see) users need to get body twice.

If you want, we can update the code like:

    if enc in ("gzip", "deflate"):
        if self._decompressed_body:
                 return self._decompressed_body
        import zlib
        zlib_mode = 16 + zlib.MAX_WBITS if enc == "gzip" else zlib.MAX_WBITS
        decompressor = zlib.decompressobj(wbits=zlib_mode)
        self._decompressed_body = decompressor.decompress(self._body)
        return self._decompressed_body
    return self._body

But to be honest, I don't see lots of value for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The don't need to get the body more than once. And it would not be clear to me that getting the body and then the text will decompress the body twice.

I don't think we need to keep the compressed data around once it has been decompressed, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fair. Updated. :)

return self._body

def text(self, encoding: Optional[str] = None) -> str:
Expand All @@ -274,10 +305,36 @@ def text(self, encoding: Optional[str] = None) -> str:

:param str encoding: The encoding to apply.
"""
# super().text detects charset based on self._body() which is compressed
# implement the decoding explicitly here
body = self.body()

ctype = self.headers.get(aiohttp.hdrs.CONTENT_TYPE, "").lower()
mimetype = aiohttp.helpers.parse_mimetype(ctype)

encoding = mimetype.parameters.get("charset")
if encoding:
try:
codecs.lookup(encoding)
except LookupError:
encoding = None
if not encoding:
if mimetype.type == "application" and (
mimetype.subtype == "json" or mimetype.subtype == "rdap"
):
# RFC 7159 states that the default encoding is UTF-8.
# RFC 7483 defines application/rdap+json
encoding = "utf-8"
elif body is None:
raise RuntimeError(
"Cannot guess the encoding of a not yet read body"
)
else:
encoding = chardet.detect(body)["encoding"]
annatisch marked this conversation as resolved.
Show resolved Hide resolved
if not encoding:
annatisch marked this conversation as resolved.
Show resolved Hide resolved
encoding = self.internal_response.get_encoding()
encoding = "utf-8-sig"

return super().text(encoding)
return body.decode(encoding)

async def load_body(self) -> None:
"""Load in memory the body, so it could be accessible from sync methods."""
Expand Down
39 changes: 39 additions & 0 deletions sdk/core/azure-core/tests/async_tests/test_universal_http_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,45 @@ async def test_aiohttp_response_text():
)
assert res.text(encoding) == '56', "Encoding {} didn't work".format(encoding)

@pytest.mark.asyncio
async def test_aiohttp_response_decompression():
res = _create_aiohttp_response(
b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x00\x8d\x8d\xb1n\xc30\x0cD"
b"\xff\x85s\x14HVlY\xda\x8av.\n4\x1d\x9a\x8d\xa1\xe5D\x80m\x01\x12="
b"\x14A\xfe\xbd\x92\x81d\xceB\x1c\xef\xf8\x8e7\x08\x038\xf0\xa67Fj+"
b"\x946\x9d8\x0c4\x08{\x96(\x94mzkh\x1cM/a\x07\x94<\xb2\x1f>\xca8\x86"
b"\xd9\xff0\x15\xb6\x91\x8d\x12\xb2\x15\xd2\x1c\x95q\xbau\xba\xdbk"
b"\xd5(\xd9\xb5\xa7\xc2L\x98\xf9\x8d8\xc4\xe5U\xccV,3\xf2\x9a\xcb\xddg"
b"\xe4o\xc6T\xdeVw\x9dgL\x7f\xe0n\xc0\x91q\x02'w0b\x98JZe^\x89|\xce\x9b"
b"\x0e\xcbW\x8a\x97\xf4X\x97\xc8\xbf\xfeYU\x1d\xc2\x85\xfc\xf4@\xb7\xbe"
b"\xf7+&$\xf6\xa9\x8a\xcb\x96\xdc\xef\xff\xaa\xa1\x1c\xf9$\x01\x00\x00",
{'Content-Type': 'text/plain', 'Content-Encoding':"gzip"}
)
body = res.body()
expect = b'{"id":"e7877039-1376-4dcd-9b0a-192897cff780","createdDateTimeUtc":' \
b'"2021-05-07T17:35:36.3121065Z","lastActionDateTimeUtc":' \
b'"2021-05-07T17:35:36.3121069Z","status":"NotStarted",' \
b'"summary":{"total":0,"failed":0,"success":0,"inProgress":0,' \
b'"notYetStarted":0,"cancelled":0,"totalCharacterCharged":0}}'
assert res.body() == expect, "Decompression didn't work"

@pytest.mark.asyncio
async def test_aiohttp_response_decompression_negtive():
import zlib
res = _create_aiohttp_response(
b"\xff\x85s\x14HVlY\xda\x8av.\n4\x1d\x9a\x8d\xa1\xe5D\x80m\x01\x12="
b"\x14A\xfe\xbd\x92\x81d\xceB\x1c\xef\xf8\x8e7\x08\x038\xf0\xa67Fj+"
b"\x946\x9d8\x0c4\x08{\x96(\x94mzkh\x1cM/a\x07\x94<\xb2\x1f>\xca8\x86"
b"\xd9\xff0\x15\xb6\x91\x8d\x12\xb2\x15\xd2\x1c\x95q\xbau\xba\xdbk"
b"\xd5(\xd9\xb5\xa7\xc2L\x98\xf9\x8d8\xc4\xe5U\xccV,3\xf2\x9a\xcb\xddg"
b"\xe4o\xc6T\xdeVw\x9dgL\x7f\xe0n\xc0\x91q\x02'w0b\x98JZe^\x89|\xce\x9b"
b"\x0e\xcbW\x8a\x97\xf4X\x97\xc8\xbf\xfeYU\x1d\xc2\x85\xfc\xf4@\xb7\xbe"
b"\xf7+&$\xf6\xa9\x8a\xcb\x96\xdc\xef\xff\xaa\xa1\x1c\xf9$\x01\x00\x00",
{'Content-Type': 'text/plain', 'Content-Encoding':"gzip"}
)
with pytest.raises(zlib.error):
body = res.body()

def test_repr():
res = _create_aiohttp_response(
b'\xef\xbb\xbf56',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ interactions:
Accept:
- application/json;odata.metadata=none
User-Agent:
- azsdk-python-search-documents/11.1.1 Python/3.8.6 (Windows-10-10.0.19041-SP0)
- azsdk-python-search-documents/11.2.0b3 Python/3.8.6 (Windows-10-10.0.19041-SP0)
method: GET
uri: https://search6dc91ab1.search.windows.net/indexes('drgqefsg')/docs/$count?api-version=2020-06-30-Preview
response:
body:
string: "\uFEFF10"
string: !!binary |
H4sIAAAAAAAEAO29B2AcSZYlJi9tynt/SvVK1+B0oQiAYBMk2JBAEOzBiM3mkuwdaUcjKasqgcpl
VmVdZhZAzO2dvPfee++999577733ujudTif33/8/XGZkAWz2zkrayZ4hgKrIHz9+fB8/Iv7Hv/cf
3N35fwC74FOIBQAAAA==
headers:
cache-control: no-cache
content-encoding: gzip
content-length: '127'
content-type: text/plain
date: Thu, 25 Mar 2021 20:21:11 GMT
elapsed-time: '67'
date: Sat, 08 May 2021 06:20:16 GMT
elapsed-time: '152'
expires: '-1'
odata-version: '4.0'
pragma: no-cache
preference-applied: odata.include-annotations="*"
request-id: a406bb84-8da7-11eb-ac20-a0481ca055a9
request-id: 746a9a78-afc5-11eb-ad79-a0481ca055a9
strict-transport-security: max-age=15724800; includeSubDomains
vary: Accept-Encoding
status:
Expand Down
Loading