Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of WebSocketReader #8736

Merged
merged 18 commits into from
Aug 19, 2024
Merged

Conversation

bdraco
Copy link
Member

@bdraco bdraco commented Aug 19, 2024

Optimization

This is a paired down version of #8735 to better optimize for both the non-continuation and continuation case

Testing feed_data on a live HA instance. Testing was done on an Odroid N2+ SBC. Performance numbers will vary based on CPU and memory performance.

~6% faster for the continuation case (real world use case is listening to a unifi instance)
~34% faster for the non-continuation case (real world use case is reading Bluetooth data from a shelly device, web based terminal shows a similar improvement)

Improvement percentages will be less impressive on systems with faster CPU/memory performance.

related discussion #8258

benchmark for non-continuation case

import random
import zlib
from typing import Any
import timeit
import asyncio
import aiohttp
from aiohttp.http import WSMsgType
from aiohttp.http_websocket import (
    _WS_DEFLATE_TRAILING,
    PACK_LEN1,
    PACK_LEN2,
    PACK_LEN3,
    WebSocketReader,
    _websocket_mask,
)


def build_frame(
    message: Any,
    opcode: Any,
    use_mask: bool = False,
    noheader: bool = False,
    is_fin: bool = True,
    compress: bool = False,
):
    # Send a frame over the websocket with message as its payload.
    if compress:
        compressobj = zlib.compressobj(wbits=-9)
        message = compressobj.compress(message)
        message = message + compressobj.flush(zlib.Z_SYNC_FLUSH)
        if message.endswith(_WS_DEFLATE_TRAILING):
            message = message[:-4]
    msg_length = len(message)
    if use_mask:  # pragma: no cover
        mask_bit = 0x80
    else:
        mask_bit = 0

    if is_fin:
        header_first_byte = 0x80 | opcode
    else:
        header_first_byte = opcode

    if compress:
        header_first_byte |= 0x40

    if msg_length < 126:
        header = PACK_LEN1(header_first_byte, msg_length | mask_bit)
    elif msg_length < (1 << 16):  # pragma: no cover
        header = PACK_LEN2(header_first_byte, 126 | mask_bit, msg_length)
    else:
        header = PACK_LEN3(header_first_byte, 127 | mask_bit, msg_length)

    if use_mask:  # pragma: no cover
        mask = random.randrange(0, 0xFFFFFFFF)
        mask = mask.to_bytes(4, "big")
        message = bytearray(message)
        _websocket_mask(mask, message)
        if noheader:
            return message
        else:
            return header + mask + message
    else:
        if noheader:
            return message
        else:
            return header + message


def out(loop):
    return aiohttp.DataQueue(loop)


def parser(out):
    return WebSocketReader(out, 4 * 1024 * 1024)


async def bench():
    data = build_frame(b"thisisthebinaryframe" * 256, WSMsgType.BINARY)
    loop = asyncio.get_running_loop()
    out_queue = out(loop)
    parser_instance = parser(out_queue)
    print(
        timeit.timeit(
            "parser_instance.feed_data(data)", globals=locals(), number=1000000
        )
    )


asyncio.run(bench())

profiler

import random
import zlib
from typing import Any
import asyncio
import aiohttp
from aiohttp.http import WSMsgType
import cProfile
from aiohttp.http_websocket import (
    _WS_DEFLATE_TRAILING,
    PACK_LEN1,
    PACK_LEN2,
    PACK_LEN3,
    WebSocketReader,
    _websocket_mask,
)


def build_frame(
    message: Any,
    opcode: Any,
    use_mask: bool = False,
    noheader: bool = False,
    is_fin: bool = True,
    compress: bool = False,
):
    # Send a frame over the websocket with message as its payload.
    if compress:
        compressobj = zlib.compressobj(wbits=-9)
        message = compressobj.compress(message)
        message = message + compressobj.flush(zlib.Z_SYNC_FLUSH)
        if message.endswith(_WS_DEFLATE_TRAILING):
            message = message[:-4]
    msg_length = len(message)
    if use_mask:  # pragma: no cover
        mask_bit = 0x80
    else:
        mask_bit = 0

    if is_fin:
        header_first_byte = 0x80 | opcode
    else:
        header_first_byte = opcode

    if compress:
        header_first_byte |= 0x40

    if msg_length < 126:
        header = PACK_LEN1(header_first_byte, msg_length | mask_bit)
    elif msg_length < (1 << 16):  # pragma: no cover
        header = PACK_LEN2(header_first_byte, 126 | mask_bit, msg_length)
    else:
        header = PACK_LEN3(header_first_byte, 127 | mask_bit, msg_length)

    if use_mask:  # pragma: no cover
        mask = random.randrange(0, 0xFFFFFFFF)
        mask = mask.to_bytes(4, "big")
        message = bytearray(message)
        _websocket_mask(mask, message)
        if noheader:
            return message
        else:
            return header + mask + message
    else:
        if noheader:
            return message
        else:
            return header + message


def out(loop):
    return aiohttp.DataQueue(loop)


def parser(out):
    return WebSocketReader(out, 4 * 1024 * 1024)


async def bench():
    data = build_frame(b"thisisthebinaryframe" * 256, WSMsgType.BINARY)
    loop = asyncio.get_running_loop()
    out_queue = out(loop)
    parser_instance = parser(out_queue)
    pr = cProfile.Profile()
    pr.enable()
    for _ in range(1000000):
        parser_instance.feed_data(data)
    pr.disable()
    pr.create_stats()
    pr.dump_stats("feed.cprof")


asyncio.run(bench())

this is a paired down version of #8735 to better
optimize for both the non-continuation and
continuation case
@bdraco bdraco added backport-3.10 Trigger automatic backporting to the 3.10 release branch by Patchback robot backport-3.11 Trigger automatic backporting to the 3.11 release branch by Patchback robot labels Aug 19, 2024
Copy link

codecov bot commented Aug 19, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 98.13%. Comparing base (1eb6519) to head (7fc3ffb).
Report is 1034 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8736      +/-   ##
==========================================
- Coverage   98.14%   98.13%   -0.01%     
==========================================
  Files         107      107              
  Lines       34062    34065       +3     
  Branches     4029     4030       +1     
==========================================
+ Hits        33430    33431       +1     
- Misses        456      457       +1     
- Partials      176      177       +1     
Flag Coverage Δ
CI-GHA 98.03% <100.00%> (-0.01%) ⬇️
OS-Linux 97.69% <100.00%> (-0.01%) ⬇️
OS-Windows 96.06% <98.07%> (-0.01%) ⬇️
OS-macOS 97.34% <98.07%> (-0.02%) ⬇️
Py-3.10.11 97.43% <98.07%> (-0.02%) ⬇️
Py-3.10.14 97.38% <100.00%> (+<0.01%) ⬆️
Py-3.11.9 97.62% <100.00%> (-0.01%) ⬇️
Py-3.12.4 97.62% <100.00%> (-0.12%) ⬇️
Py-3.12.5 97.21% <100.00%> (?)
Py-3.8.10 95.70% <94.23%> (-0.02%) ⬇️
Py-3.8.18 97.20% <96.15%> (-0.01%) ⬇️
Py-3.9.13 97.33% <94.23%> (-0.02%) ⬇️
Py-3.9.19 97.29% <96.15%> (-0.02%) ⬇️
Py-pypy7.3.16 96.90% <96.15%> (-0.01%) ⬇️
VM-macos 97.34% <98.07%> (-0.02%) ⬇️
VM-ubuntu 97.69% <100.00%> (-0.01%) ⬇️
VM-windows 96.06% <98.07%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@psf-chronographer psf-chronographer bot added the bot:chronographer:provided There is a change note present in this PR label Aug 19, 2024
@bdraco bdraco marked this pull request as ready for review August 19, 2024 16:18
@bdraco bdraco merged commit 1b88af2 into master Aug 19, 2024
37 of 38 checks passed
@bdraco bdraco deleted the optimize_websocket_reader branch August 19, 2024 16:33
Copy link
Contributor

patchback bot commented Aug 19, 2024

Backport to 3.10: 💔 cherry-picking failed — conflicts found

❌ Failed to cleanly apply 1b88af2 on top of patchback/backports/3.10/1b88af2c3f5e5f992d0015f90927fd9c3e00bef0/pr-8736

Backporting merged PR #8736 into master

  1. Ensure you have a local repo clone of your fork. Unless you cloned it
    from the upstream, this would be your origin remote.
  2. Make sure you have an upstream repo added as a remote too. In these
    instructions you'll refer to it by the name upstream. If you don't
    have it, here's how you can add it:
    $ git remote add upstream https://github.com/aio-libs/aiohttp.git
  3. Ensure you have the latest copy of upstream and prepare a branch
    that will hold the backported code:
    $ git fetch upstream
    $ git checkout -b patchback/backports/3.10/1b88af2c3f5e5f992d0015f90927fd9c3e00bef0/pr-8736 upstream/3.10
  4. Now, cherry-pick PR Improve performance of WebSocketReader #8736 contents into that branch:
    $ git cherry-pick -x 1b88af2c3f5e5f992d0015f90927fd9c3e00bef0
    If it'll yell at you with something like fatal: Commit 1b88af2c3f5e5f992d0015f90927fd9c3e00bef0 is a merge but no -m option was given., add -m 1 as follows instead:
    $ git cherry-pick -m1 -x 1b88af2c3f5e5f992d0015f90927fd9c3e00bef0
  5. At this point, you'll probably encounter some merge conflicts. You must
    resolve them in to preserve the patch from PR Improve performance of WebSocketReader #8736 as close to the
    original as possible.
  6. Push this branch to your fork on GitHub:
    $ git push origin patchback/backports/3.10/1b88af2c3f5e5f992d0015f90927fd9c3e00bef0/pr-8736
  7. Create a PR, ensure that the CI is green. If it's not — update it so that
    the tests and any other checks pass. This is it!
    Now relax and wait for the maintainers to process your pull request
    when they have some cycles to do reviews. Don't worry — they'll tell you if
    any improvements are necessary when the time comes!

🤖 @patchback
I'm built with octomachinery and
my source is open — https://github.com/sanitizers/patchback-github-app.

Copy link
Contributor

patchback bot commented Aug 19, 2024

Backport to 3.11: 💔 cherry-picking failed — conflicts found

❌ Failed to cleanly apply 1b88af2 on top of patchback/backports/3.11/1b88af2c3f5e5f992d0015f90927fd9c3e00bef0/pr-8736

Backporting merged PR #8736 into master

  1. Ensure you have a local repo clone of your fork. Unless you cloned it
    from the upstream, this would be your origin remote.
  2. Make sure you have an upstream repo added as a remote too. In these
    instructions you'll refer to it by the name upstream. If you don't
    have it, here's how you can add it:
    $ git remote add upstream https://github.com/aio-libs/aiohttp.git
  3. Ensure you have the latest copy of upstream and prepare a branch
    that will hold the backported code:
    $ git fetch upstream
    $ git checkout -b patchback/backports/3.11/1b88af2c3f5e5f992d0015f90927fd9c3e00bef0/pr-8736 upstream/3.11
  4. Now, cherry-pick PR Improve performance of WebSocketReader #8736 contents into that branch:
    $ git cherry-pick -x 1b88af2c3f5e5f992d0015f90927fd9c3e00bef0
    If it'll yell at you with something like fatal: Commit 1b88af2c3f5e5f992d0015f90927fd9c3e00bef0 is a merge but no -m option was given., add -m 1 as follows instead:
    $ git cherry-pick -m1 -x 1b88af2c3f5e5f992d0015f90927fd9c3e00bef0
  5. At this point, you'll probably encounter some merge conflicts. You must
    resolve them in to preserve the patch from PR Improve performance of WebSocketReader #8736 as close to the
    original as possible.
  6. Push this branch to your fork on GitHub:
    $ git push origin patchback/backports/3.11/1b88af2c3f5e5f992d0015f90927fd9c3e00bef0/pr-8736
  7. Create a PR, ensure that the CI is green. If it's not — update it so that
    the tests and any other checks pass. This is it!
    Now relax and wait for the maintainers to process your pull request
    when they have some cycles to do reviews. Don't worry — they'll tell you if
    any improvements are necessary when the time comes!

🤖 @patchback
I'm built with octomachinery and
my source is open — https://github.com/sanitizers/patchback-github-app.

bdraco added a commit that referenced this pull request Aug 19, 2024
bdraco added a commit that referenced this pull request Aug 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-3.10 Trigger automatic backporting to the 3.10 release branch by Patchback robot backport-3.11 Trigger automatic backporting to the 3.11 release branch by Patchback robot bot:chronographer:provided There is a change note present in this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants