diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index bad5a9ef8e5..4ddc2d38e25 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -38,6 +38,7 @@ InvalidURL, ServerFingerprintMismatch, ) +from .compression_utils import HAS_BROTLI from .formdata import FormData from .helpers import ( BaseTimerContext, @@ -51,7 +52,6 @@ set_result, ) from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11, StreamWriter -from .http_parser import HAS_BROTLI from .log import client_logger from .streams import StreamReader from .typedefs import ( diff --git a/aiohttp/compression_utils.py b/aiohttp/compression_utils.py new file mode 100644 index 00000000000..8abc4fa7c3c --- /dev/null +++ b/aiohttp/compression_utils.py @@ -0,0 +1,148 @@ +import asyncio +import zlib +from concurrent.futures import Executor +from typing import Optional, cast + +try: + import brotli + + HAS_BROTLI = True +except ImportError: # pragma: no cover + HAS_BROTLI = False + +MAX_SYNC_CHUNK_SIZE = 1024 + + +def encoding_to_mode( + encoding: Optional[str] = None, + suppress_deflate_header: bool = False, +) -> int: + if encoding == "gzip": + return 16 + zlib.MAX_WBITS + + return -zlib.MAX_WBITS if suppress_deflate_header else zlib.MAX_WBITS + + +class ZlibBaseHandler: + def __init__( + self, + mode: int, + executor: Optional[Executor] = None, + max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, + ): + self._mode = mode + self._executor = executor + self._max_sync_chunk_size = max_sync_chunk_size + + +class ZLibCompressor(ZlibBaseHandler): + def __init__( + self, + encoding: Optional[str] = None, + suppress_deflate_header: bool = False, + level: Optional[int] = None, + wbits: Optional[int] = None, + strategy: int = zlib.Z_DEFAULT_STRATEGY, + executor: Optional[Executor] = None, + max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, + ): + super().__init__( + mode=encoding_to_mode(encoding, suppress_deflate_header) + if wbits is None + else wbits, + executor=executor, + max_sync_chunk_size=max_sync_chunk_size, + ) + if level is None: + self._compressor = zlib.compressobj(wbits=self._mode, strategy=strategy) + else: + self._compressor = zlib.compressobj( + wbits=self._mode, strategy=strategy, level=level + ) + + def compress_sync(self, data: bytes) -> bytes: + return self._compressor.compress(data) + + async def compress(self, data: bytes) -> bytes: + if ( + self._max_sync_chunk_size is not None + and len(data) > self._max_sync_chunk_size + ): + return await asyncio.get_event_loop().run_in_executor( + self._executor, self.compress_sync, data + ) + return self.compress_sync(data) + + def flush(self, mode: int = zlib.Z_FINISH) -> bytes: + return self._compressor.flush(mode) + + +class ZLibDecompressor(ZlibBaseHandler): + def __init__( + self, + encoding: Optional[str] = None, + suppress_deflate_header: bool = False, + executor: Optional[Executor] = None, + max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, + ): + super().__init__( + mode=encoding_to_mode(encoding, suppress_deflate_header), + executor=executor, + max_sync_chunk_size=max_sync_chunk_size, + ) + self._decompressor = zlib.decompressobj(wbits=self._mode) + + def decompress_sync(self, data: bytes, max_length: int = 0) -> bytes: + return self._decompressor.decompress(data, max_length) + + async def decompress(self, data: bytes, max_length: int = 0) -> bytes: + if ( + self._max_sync_chunk_size is not None + and len(data) > self._max_sync_chunk_size + ): + return await asyncio.get_event_loop().run_in_executor( + self._executor, self.decompress_sync, data, max_length + ) + return self.decompress_sync(data, max_length) + + def flush(self, length: int = 0) -> bytes: + return ( + self._decompressor.flush(length) + if length > 0 + else self._decompressor.flush() + ) + + @property + def eof(self) -> bool: + return self._decompressor.eof + + @property + def unconsumed_tail(self) -> bytes: + return self._decompressor.unconsumed_tail + + @property + def unused_data(self) -> bytes: + return self._decompressor.unused_data + + +class BrotliDecompressor: + # Supports both 'brotlipy' and 'Brotli' packages + # since they share an import name. The top branches + # are for 'brotlipy' and bottom branches for 'Brotli' + def __init__(self) -> None: + if not HAS_BROTLI: + raise RuntimeError( + "The brotli decompression is not available. " + "Please install `Brotli` module" + ) + self._obj = brotli.Decompressor() + + def decompress_sync(self, data: bytes) -> bytes: + if hasattr(self._obj, "decompress"): + return cast(bytes, self._obj.decompress(data)) + return cast(bytes, self._obj.process(data)) + + def flush(self) -> bytes: + if hasattr(self._obj, "flush"): + return cast(bytes, self._obj.flush()) + return b"" diff --git a/aiohttp/http_parser.py b/aiohttp/http_parser.py index 9749de25514..aba1d233d39 100644 --- a/aiohttp/http_parser.py +++ b/aiohttp/http_parser.py @@ -3,7 +3,6 @@ import collections import re import string -import zlib from contextlib import suppress from enum import IntEnum from typing import ( @@ -18,7 +17,6 @@ Type, TypeVar, Union, - cast, ) from multidict import CIMultiDict, CIMultiDictProxy, istr @@ -26,6 +24,7 @@ from . import hdrs from .base_protocol import BaseProtocol +from .compression_utils import HAS_BROTLI, BrotliDecompressor, ZLibDecompressor from .helpers import NO_EXTENSIONS, BaseTimerContext from .http_exceptions import ( BadHttpMessage, @@ -41,14 +40,6 @@ from .streams import EMPTY_PAYLOAD, StreamReader from .typedefs import Final, RawHeaders -try: - import brotli - - HAS_BROTLI = True -except ImportError: # pragma: no cover - HAS_BROTLI = False - - __all__ = ( "HeadersParser", "HttpParser", @@ -868,34 +859,16 @@ def __init__(self, out: StreamReader, encoding: Optional[str]) -> None: self.encoding = encoding self._started_decoding = False + self.decompressor: Union[BrotliDecompressor, ZLibDecompressor] if encoding == "br": if not HAS_BROTLI: # pragma: no cover raise ContentEncodingError( "Can not decode content-encoding: brotli (br). " "Please install `Brotli`" ) - - class BrotliDecoder: - # Supports both 'brotlipy' and 'Brotli' packages - # since they share an import name. The top branches - # are for 'brotlipy' and bottom branches for 'Brotli' - def __init__(self) -> None: - self._obj = brotli.Decompressor() - - def decompress(self, data: bytes) -> bytes: - if hasattr(self._obj, "decompress"): - return cast(bytes, self._obj.decompress(data)) - return cast(bytes, self._obj.process(data)) - - def flush(self) -> bytes: - if hasattr(self._obj, "flush"): - return cast(bytes, self._obj.flush()) - return b"" - - self.decompressor = BrotliDecoder() + self.decompressor = BrotliDecompressor() else: - zlib_mode = 16 + zlib.MAX_WBITS if encoding == "gzip" else zlib.MAX_WBITS - self.decompressor = zlib.decompressobj(wbits=zlib_mode) + self.decompressor = ZLibDecompressor(encoding=encoding) def set_exception(self, exc: BaseException) -> None: self.out.set_exception(exc) @@ -916,10 +889,12 @@ def feed_data(self, chunk: bytes, size: int) -> None: ): # Change the decoder to decompress incorrectly compressed data # Actually we should issue a warning about non-RFC-compliant data. - self.decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS) + self.decompressor = ZLibDecompressor( + encoding=self.encoding, suppress_deflate_header=True + ) try: - chunk = self.decompressor.decompress(chunk) + chunk = self.decompressor.decompress_sync(chunk) except Exception: raise ContentEncodingError( "Can not decode content-encoding: %s" % self.encoding diff --git a/aiohttp/http_websocket.py b/aiohttp/http_websocket.py index bef6c99a676..ae45b473557 100644 --- a/aiohttp/http_websocket.py +++ b/aiohttp/http_websocket.py @@ -13,6 +13,7 @@ from typing import Any, Callable, List, Optional, Pattern, Set, Tuple, Union, cast from .base_protocol import BaseProtocol +from .compression_utils import ZLibCompressor, ZLibDecompressor from .helpers import NO_EXTENSIONS from .streams import DataQueue from .typedefs import Final @@ -278,7 +279,7 @@ def __init__( self._payload_length = 0 self._payload_length_flag = 0 self._compressed: Optional[bool] = None - self._decompressobj: Any = None # zlib.decompressobj actually + self._decompressobj: Optional[ZLibDecompressor] = None self._compress = compress def feed_eof(self) -> None: @@ -298,7 +299,7 @@ def feed_data(self, data: bytes) -> Tuple[bool, bytes]: def _feed_data(self, data: bytes) -> Tuple[bool, bytes]: for fin, opcode, payload, compressed in self.parse_frame(data): if compressed and not self._decompressobj: - self._decompressobj = zlib.decompressobj(wbits=-zlib.MAX_WBITS) + self._decompressobj = ZLibDecompressor(suppress_deflate_header=True) if opcode == WSMsgType.CLOSE: if len(payload) >= 2: close_code = UNPACK_CLOSE_CODE(payload[:2])[0] @@ -383,8 +384,9 @@ def _feed_data(self, data: bytes) -> Tuple[bool, bytes]: # Decompress process must to be done after all packets # received. if compressed: + assert self._decompressobj is not None self._partial.extend(_WS_DEFLATE_TRAILING) - payload_merged = self._decompressobj.decompress( + payload_merged = self._decompressobj.decompress_sync( self._partial, self._max_msg_size ) if self._decompressobj.unconsumed_tail: @@ -612,16 +614,16 @@ async def _send_frame( if (compress or self.compress) and opcode < 8: if compress: # Do not set self._compress if compressing is for this frame - compressobj = zlib.compressobj(level=zlib.Z_BEST_SPEED, wbits=-compress) + compressobj = ZLibCompressor(level=zlib.Z_BEST_SPEED, wbits=-compress) else: # self.compress if not self._compressobj: - self._compressobj = zlib.compressobj( + self._compressobj = ZLibCompressor( level=zlib.Z_BEST_SPEED, wbits=-self.compress ) compressobj = self._compressobj - message = compressobj.compress(message) - message = message + compressobj.flush( + message = await compressobj.compress(message) + message += compressobj.flush( zlib.Z_FULL_FLUSH if self.notakeover else zlib.Z_SYNC_FLUSH ) if message.endswith(_WS_DEFLATE_TRAILING): diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index 73f0f96f0ae..8f2d9086b92 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -8,6 +8,7 @@ from .abc import AbstractStreamWriter from .base_protocol import BaseProtocol +from .compression_utils import ZLibCompressor from .helpers import NO_EXTENSIONS __all__ = ("StreamWriter", "HttpVersion", "HttpVersion10", "HttpVersion11") @@ -43,7 +44,7 @@ def __init__( self.output_size = 0 self._eof = False - self._compress: Any = None + self._compress: Optional[ZLibCompressor] = None self._drain_waiter = None self._on_chunk_sent: _T_OnChunkSent = on_chunk_sent @@ -63,8 +64,7 @@ def enable_chunking(self) -> None: def enable_compression( self, encoding: str = "deflate", strategy: int = zlib.Z_DEFAULT_STRATEGY ) -> None: - zlib_mode = 16 + zlib.MAX_WBITS if encoding == "gzip" else zlib.MAX_WBITS - self._compress = zlib.compressobj(wbits=zlib_mode, strategy=strategy) + self._compress = ZLibCompressor(encoding=encoding, strategy=strategy) def _write(self, chunk: bytes) -> None: size = len(chunk) @@ -93,7 +93,7 @@ async def write( chunk = chunk.cast("c") if self._compress is not None: - chunk = self._compress.compress(chunk) + chunk = await self._compress.compress(chunk) if not chunk: return @@ -138,9 +138,9 @@ async def write_eof(self, chunk: bytes = b"") -> None: if self._compress: if chunk: - chunk = self._compress.compress(chunk) + chunk = await self._compress.compress(chunk) - chunk = chunk + self._compress.flush() + chunk += self._compress.flush() if chunk and self.chunked: chunk_len = ("%x\r\n" % len(chunk)).encode("ascii") chunk = chunk_len + chunk + b"\r\n0\r\n\r\n" diff --git a/aiohttp/multipart.py b/aiohttp/multipart.py index e6308de461d..8012624afbb 100644 --- a/aiohttp/multipart.py +++ b/aiohttp/multipart.py @@ -27,6 +27,7 @@ from multidict import CIMultiDict, CIMultiDictProxy, MultiMapping +from .compression_utils import ZLibCompressor, ZLibDecompressor from .hdrs import ( CONTENT_DISPOSITION, CONTENT_ENCODING, @@ -454,15 +455,15 @@ def decode(self, data: bytes) -> bytes: def _decode_content(self, data: bytes) -> bytes: encoding = self.headers.get(CONTENT_ENCODING, "").lower() - - if encoding == "deflate": - return zlib.decompress(data, -zlib.MAX_WBITS) - elif encoding == "gzip": - return zlib.decompress(data, 16 + zlib.MAX_WBITS) - elif encoding == "identity": + if encoding == "identity": return data - else: - raise RuntimeError(f"unknown content encoding: {encoding}") + if encoding in {"deflate", "gzip"}: + return ZLibDecompressor( + encoding=encoding, + suppress_deflate_header=True, + ).decompress_sync(data) + + raise RuntimeError(f"unknown content encoding: {encoding}") def _decode_content_transfer(self, data: bytes) -> bytes: encoding = self.headers.get(CONTENT_TRANSFER_ENCODING, "").lower() @@ -914,7 +915,7 @@ class MultipartPayloadWriter: def __init__(self, writer: Any) -> None: self._writer = writer self._encoding: Optional[str] = None - self._compress: Any = None + self._compress: Optional[ZLibCompressor] = None self._encoding_buffer: Optional[bytearray] = None def enable_encoding(self, encoding: str) -> None: @@ -927,8 +928,11 @@ def enable_encoding(self, encoding: str) -> None: def enable_compression( self, encoding: str = "deflate", strategy: int = zlib.Z_DEFAULT_STRATEGY ) -> None: - zlib_mode = 16 + zlib.MAX_WBITS if encoding == "gzip" else -zlib.MAX_WBITS - self._compress = zlib.compressobj(wbits=zlib_mode, strategy=strategy) + self._compress = ZLibCompressor( + encoding=encoding, + suppress_deflate_header=True, + strategy=strategy, + ) async def write_eof(self) -> None: if self._compress is not None: @@ -944,7 +948,7 @@ async def write_eof(self) -> None: async def write(self, chunk: bytes) -> None: if self._compress is not None: if chunk: - chunk = self._compress.compress(chunk) + chunk = await self._compress.compress(chunk) if not chunk: return diff --git a/aiohttp/web_response.py b/aiohttp/web_response.py index 35f9d6422c6..0646139ce92 100644 --- a/aiohttp/web_response.py +++ b/aiohttp/web_response.py @@ -6,7 +6,6 @@ import math import time import warnings -import zlib from concurrent.futures import Executor from http import HTTPStatus from http.cookies import Morsel, SimpleCookie @@ -25,6 +24,7 @@ from . import hdrs, payload from .abc import AbstractStreamWriter +from .compression_utils import ZLibCompressor from .helpers import ( ETAG_ANY, PY_38, @@ -761,13 +761,6 @@ async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: return await super()._start(request) - def _compress_body(self, zlib_mode: int) -> None: - assert zlib_mode > 0 - compressobj = zlib.compressobj(wbits=zlib_mode) - body_in = self._body - assert body_in is not None - self._compressed_body = compressobj.compress(body_in) + compressobj.flush() - async def _do_start_compression(self, coding: ContentCoding) -> None: if self._body_payload or self._chunked: return await super()._do_start_compression(coding) @@ -775,26 +768,26 @@ async def _do_start_compression(self, coding: ContentCoding) -> None: if coding != ContentCoding.identity: # Instead of using _payload_writer.enable_compression, # compress the whole body - zlib_mode = ( - 16 + zlib.MAX_WBITS if coding == ContentCoding.gzip else zlib.MAX_WBITS + compressor = ZLibCompressor( + encoding=str(coding.value), + max_sync_chunk_size=self._zlib_executor_size, + executor=self._zlib_executor, ) - body_in = self._body - assert body_in is not None - if ( - self._zlib_executor_size is not None - and len(body_in) > self._zlib_executor_size - ): - await asyncio.get_event_loop().run_in_executor( - self._zlib_executor, self._compress_body, zlib_mode + assert self._body is not None + if self._zlib_executor_size is None and len(self._body) > 1024 * 1024: + warnings.warn( + "Synchronous compression of large response bodies " + f"({len(self._body)} bytes) might block the async event loop. " + "Consider providing a custom value to zlib_executor_size/" + "zlib_executor response properties or disabling compression on it." ) - else: - self._compress_body(zlib_mode) - - body_out = self._compressed_body - assert body_out is not None + self._compressed_body = ( + await compressor.compress(self._body) + compressor.flush() + ) + assert self._compressed_body is not None self._headers[hdrs.CONTENT_ENCODING] = coding.value - self._headers[hdrs.CONTENT_LENGTH] = str(len(body_out)) + self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body)) def json_response( diff --git a/tests/test_http_parser.py b/tests/test_http_parser.py index ad8ff3ad284..d0588c0356f 100644 --- a/tests/test_http_parser.py +++ b/tests/test_http_parser.py @@ -1107,7 +1107,7 @@ async def test_feed_data(self, stream) -> None: dbuf = DeflateBuffer(buf, "deflate") dbuf.decompressor = mock.Mock() - dbuf.decompressor.decompress.return_value = b"line" + dbuf.decompressor.decompress_sync.return_value = b"line" # First byte should be b'x' in order code not to change the decoder. dbuf.feed_data(b"xxxx", 4) @@ -1121,7 +1121,7 @@ async def test_feed_data_err(self, stream) -> None: exc = ValueError() dbuf.decompressor = mock.Mock() - dbuf.decompressor.decompress.side_effect = exc + dbuf.decompressor.decompress_sync.side_effect = exc with pytest.raises(http_exceptions.ContentEncodingError): # Should be more than 4 bytes to trigger deflate FSM error.