Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

anyio integration #1157

Merged
merged 71 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
d06f40c
First whack at anyio integration
uSpike Mar 26, 2021
75310b5
Fix formatting
uSpike Mar 26, 2021
a660684
Remove debug messages
uSpike Mar 26, 2021
42b83cb
mypy fixes
uSpike Mar 26, 2021
9870a1f
Update README.md
uSpike Mar 27, 2021
6997eb9
Fix install_requires typo
uSpike Mar 27, 2021
e1c2adb
move_on_after blocks if deadline is too small
uSpike Mar 27, 2021
de84b4a
Linter fixes
uSpike Mar 27, 2021
e91ec33
Improve WSGI structured concurrency
uSpike Mar 27, 2021
7e2cd46
Tests use anyio
uSpike Mar 27, 2021
03e312e
Checkin progress on testclient
uSpike Mar 27, 2021
fd4569e
Prep for anyio 3
uSpike Mar 27, 2021
d785513
Remove debug backend option
uSpike Mar 27, 2021
58d5331
Use anyio 3.0.0rc1
uSpike Mar 27, 2021
268547d
Remove old style executor from GraphQLApp
uSpike Mar 27, 2021
57b2f79
Fix extra import
uSpike Mar 27, 2021
444a3ac
Don't cancel task scope early
uSpike Mar 27, 2021
4d31a60
Wait for wsgi sender to finish before exiting
uSpike Mar 27, 2021
681c348
Use memory object streams in websocket tests
uSpike Mar 27, 2021
01dd813
Test on asyncio, asyncio+uvloop, and trio
uSpike Mar 27, 2021
9f76d42
Formatting fixes
uSpike Mar 27, 2021
5c8818d
run_until_first_complete doesn't need a return
uSpike Mar 28, 2021
f0e4cd8
Fix middleware app call
uSpike Mar 28, 2021
376f9db
Simplify middleware exceptions
uSpike Mar 28, 2021
34da2b4
Use anyio for websocket test
uSpike Mar 28, 2021
31cc220
Set STARLETTE_TESTCLIENT_ASYNC_BACKEND in tests
uSpike Mar 28, 2021
73590aa
Pass async backend to portal
uSpike Mar 28, 2021
4192bf7
Formatting fixes
uSpike Mar 28, 2021
3a4b472
Bump anyio
uSpike Mar 29, 2021
cc3be48
Cleanup portals and add TestClient.async_backend
uSpike Mar 29, 2021
9b6e722
Use anyio.run_async_from_thread to send from worker thread
uSpike Mar 29, 2021
b8c43cf
Use websocket_connect as context manager
uSpike Mar 29, 2021
d51d5ff
Document changes in TestClient
uSpike Mar 29, 2021
82431f4
Formatting fix
uSpike Mar 29, 2021
2504237
Fix websocket raises coverage
uSpike Mar 29, 2021
87c614c
Merge branch 'master' into anyio
uSpike Mar 29, 2021
cf915bc
Update to anyio 3.0.0rc3 and replace aiofiles
uSpike Mar 30, 2021
72586ba
Apply suggestions from code review
uSpike Apr 10, 2021
1800f7a
Merge branch 'master' of github.com:encode/starlette into anyio
uSpike Apr 21, 2021
89e2dae
Bump to require anyio 3.0.0 final
uSpike Apr 21, 2021
f62a2ec
Remove mention of aiofiles in README.md
uSpike Apr 21, 2021
edba5dc
Merge branch 'master' into anyio
uSpike May 3, 2021
60d95e1
Merge branch 'master' into anyio
uSpike May 7, 2021
fc60420
Pin jinja2 to releases before 3 due to DeprecationWarnings
uSpike May 12, 2021
27283aa
Add task_group as application attribute
uSpike May 13, 2021
3cce6a9
Remove run_until_first_complete
uSpike May 13, 2021
cbc2e68
Merge branch 'master' of github.com:encode/starlette into anyio
uSpike May 13, 2021
c4d49a7
Undo jinja pin
uSpike May 13, 2021
4dd8c5d
Refactor anyio.sleep into an event
uSpike May 13, 2021
dde5079
Use one less task in test_websocket_concurrency_pattern
uSpike May 13, 2021
df53965
Apply review suggestions
uSpike May 13, 2021
6e0f05f
Rename argument
uSpike May 13, 2021
3a359e3
fix start_task_soon type
graingert May 24, 2021
5c77b7d
fix BaseHTTPMiddleware when used without Starlette
graingert May 24, 2021
390b7a1
Merge pull request #1 from graingert/anyio
uSpike May 25, 2021
e420181
Merge branch 'master' of github.com:encode/starlette into anyio
uSpike May 25, 2021
6a3f94d
Testclient receive() is a non-trapping function if the response is al…
uSpike May 27, 2021
0c225a3
Merge branch 'master' of github.com:encode/starlette into anyio
uSpike May 27, 2021
5667a4b
Use variable annotation for async_backend
uSpike May 27, 2021
19685db
Update docs regarding dependency on anyio
uSpike May 27, 2021
4e43146
Merge changes from master
uSpike May 27, 2021
a1ceb35
Use CancelScope instead of move_on_after in request.is_disconnected
uSpike May 27, 2021
63cfcb9
Cancel task group after returning middleware response
uSpike Jun 13, 2021
efbe6a1
Merge branch 'master' of github.com:encode/starlette into anyio
uSpike Jun 13, 2021
6208ca5
Add link to anyio backend options in testclient docs
uSpike Jun 13, 2021
8e6115b
Merge branch 'master' into anyio
uSpike Jun 13, 2021
e0c9967
Add types-dataclasses
uSpike Jun 13, 2021
27ec6f7
Merge branch 'master' into anyio
uSpike Jun 13, 2021
2b9dd22
Re-implement starlette.concurrency.run_until_first_complete and add a…
uSpike Jun 17, 2021
643d107
Fix type on handler callable
uSpike Jun 17, 2021
d0ca3f2
Apply review comments to clarify run_until_first_complete scope
uSpike Jun 18, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
# Starlette

Starlette is a lightweight [ASGI](https://asgi.readthedocs.io/en/latest/) framework/toolkit,
which is ideal for building high performance asyncio services.
which is ideal for building high performance async services.

It is production-ready, and gives you the following:

Expand All @@ -38,7 +38,8 @@ It is production-ready, and gives you the following:
* Session and Cookie support.
* 100% test coverage.
* 100% type annotated codebase.
* Zero hard dependencies.
* Few hard dependencies.
* Compatible with `asyncio` and `trio` backends.

## Requirements

Expand Down Expand Up @@ -86,7 +87,7 @@ For a more complete example, see [encode/starlette-example](https://github.com/e

## Dependencies

Starlette does not have any hard dependencies, but the following are optional:
Starlette only requires `anyio`, and the following are optional:
Kludex marked this conversation as resolved.
Show resolved Hide resolved

* [`requests`][requests] - Required if you want to use the `TestClient`.
* [`aiofiles`][aiofiles] - Required if you want to use `FileResponse` or `StaticFiles`.
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def get_packages(package):
packages=get_packages("starlette"),
package_data={"starlette": ["py.typed"]},
include_package_data=True,
install_requires=["anyio<3,>=2"],
uSpike marked this conversation as resolved.
Show resolved Hide resolved
extras_require={
"full": [
"aiofiles",
Expand Down
6 changes: 5 additions & 1 deletion starlette/applications.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import typing

import anyio

from starlette.datastructures import State, URLPath
from starlette.exceptions import ExceptionMiddleware
from starlette.middleware import Middleware
Expand Down Expand Up @@ -109,7 +111,9 @@ def url_path_for(self, name: str, **path_params: str) -> URLPath:

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
scope["app"] = self
await self.middleware_stack(scope, receive, send)
task_group = scope["task_group"] = anyio.create_task_group()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be interesting to know what quart chooses to do wrt. a request/response-wide task group. Eg. if you're running under trio do they provide an anyio TaskGroup or a trio Nursery?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've explored this elsewhere, and for an application lifespan to run background tasks with structured concurrency, it'd be better to have an app-wide task group from which all other tasks are spawned:

diff --git a/starlette/applications.py b/starlette/applications.py
index 10bd075..168b78c 100644
--- a/starlette/applications.py
+++ b/starlette/applications.py
@@ -1,6 +1,7 @@
 import typing
 
 import anyio
+from anyio.abc import TaskGroup
 
 from starlette.datastructures import State, URLPath
 from starlette.exceptions import ExceptionMiddleware
@@ -37,6 +38,7 @@ class Starlette:
     Shutdown handler callables do not take any arguments, and may be be either
     standard functions, or async functions.
     """
+    task_group: TaskGroup
 
     def __init__(
         self,
@@ -111,8 +113,8 @@ class Starlette:
 
     async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
         scope["app"] = self
-        task_group = scope["task_group"] = anyio.create_task_group()
-        async with task_group:
+        self.task_group = anyio.create_task_group()
+        async with self.task_group:
             await self.middleware_stack(scope, receive, send)
 
     # The following usages are now discouraged in favour of configuration
diff --git a/starlette/middleware/base.py b/starlette/middleware/base.py
index 16d5064..4a7e58f 100644
--- a/starlette/middleware/base.py
+++ b/starlette/middleware/base.py
@@ -29,7 +29,7 @@ class BaseHTTPMiddleware:
     async def call_next(self, request: Request) -> Response:
         send_stream, recv_stream = anyio.create_memory_object_stream()
         scope = request.scope
-        task_group = scope["task_group"]
+        task_group = scope["app"].task_group
 
         async def coro() -> None:
             async with send_stream:

This then allows you to write a lifespan generator or startup/shutdown events such as:

async def lifespan(app: Starlette):
    app.task_group.start_soon(background_task)
    ...

The only issue is, what happens if a task in that task group raises an error?

Copy link
Member

@graingert graingert May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only issue is, what happens if a task in that task group raises an error?

The lifespan task is cancelled and the server is shutdown. For the apps I write this is the desired behaviour.

If an exception is raised when calling the application callable with a lifespan.startup message or a scope with type lifespan, the server must continue but not send any lifespan events.
https://github.com/django/asgiref/blob/main/specs/lifespan.rst#scope

Other users of Starlette might want a task wrapper that catches Exceptions and logs them instead

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's a little demo of what quart_trio does:

import httpx
import anyio
import quart
import quart_trio

import logging

logger = logging.getLogger(__name__)

blueprint = quart.Blueprint("blueprint", __name__)


class MyCrashError(Exception):
    pass


@blueprint.route("/")
async def index():
    async def crash():
        raise MyCrashError

    quart.current_app.nursery.start_soon(crash)
    return b""


def create_app():
    app = quart_trio.QuartTrio(__name__)
    app.register_blueprint(blueprint)
    return app


async def amain():
    async with anyio.create_task_group() as tg:
        tg.start_soon(create_app().run_task)
        async with httpx.AsyncClient() as client:
            await client.get("http://localhost:5000/")
            (await client.get("http://localhost:5000/")).raise_for_status()


anyio.run(amain, backend="trio")
[2021-05-13 19:04:22,398] Running on http://127.0.0.1:5000 (CTRL + C to quit)
[2021-05-13 19:04:22,414] 127.0.0.1:56808 GET / 1.1 200 0 2337
[2021-05-13 19:04:22,416] ERROR in app: Exception on request GET /
Traceback (most recent call last):
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/quart_trio/app.py", line 131, in handle_request
    return await self.full_dispatch_request(request_context)
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/quart_trio/app.py", line 153, in full_dispatch_request
    result = await self.handle_user_exception(error)
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/quart_trio/app.py", line 168, in handle_user_exception
    return await super().handle_user_exception(error)
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/quart/app.py", line 943, in handle_user_exception
    raise error
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/quart_trio/app.py", line 151, in full_dispatch_request
    result = await self.dispatch_request(request_context)
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/quart/app.py", line 1511, in dispatch_request
    return await self.ensure_async(handler)(**request_.view_args)
  File "/home/graingert/projects/demo-quart-trio/foo.py", line 22, in index
    quart.current_app.nursery.start_soon(crash)
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/trio/_core/_run.py", line 988, in start_soon
    GLOBAL_RUN_CONTEXT.runner.spawn_impl(async_fn, args, self, name)
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/trio/_core/_run.py", line 1429, in spawn_impl
    raise RuntimeError("Nursery is closed to new arrivals")
RuntimeError: Nursery is closed to new arrivals
[2021-05-13 19:04:22,414] ASGI Framework Lifespan error, continuing without Lifespan support
Traceback (most recent call last):
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/hypercorn/trio/lifespan.py", line 29, in handle_lifespan
    await invoke_asgi(self.app, scope, self.asgi_receive, self.asgi_send)
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/hypercorn/utils.py", line 239, in invoke_asgi
    await app(scope, receive, send)
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/quart/app.py", line 1697, in __call__
    await self.asgi_app(scope, receive, send)
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/quart/app.py", line 1723, in asgi_app
    await asgi_handler(receive, send)
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/quart_trio/asgi.py", line 148, in __call__
    break
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/trio/_core/_run.py", line 815, in __aexit__
    raise combined_error_from_nursery
  File "/home/graingert/projects/demo-quart-trio/foo.py", line 20, in crash
    raise MyCrashError
MyCrashError
[2021-05-13 19:04:22,419] 127.0.0.1:56808 GET / 1.1 500 290 2909
Traceback (most recent call last):
  File "/home/graingert/projects/demo-quart-trio/foo.py", line 40, in <module>
    anyio.run(amain, backend="trio")
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 55, in run
    return asynclib.run(func, *args, **backend_options)  # type: ignore
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/trio/_core/_run.py", line 1932, in run
    raise runner.main_task_outcome.error
  File "/home/graingert/projects/demo-quart-trio/foo.py", line 37, in amain
    (await client.get("http://localhost:5000/")).raise_for_status()
  File "/home/graingert/.virtualenvs/testing39/lib/python3.9/site-packages/httpx/_models.py", line 1405, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: 500 Server Error:  for url: http://localhost:5000/
For more information check: https://httpstatuses.com/500

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graingert that's interesting. A similar test app in Starlette would return "/" without error but raise an exception on the server, using uvicorn.

from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route

async def crash():
    raise Exception()

async def home(request):
    request.app.task_group.start_soon(crash)
    return JSONResponse({"ok": "True"})

app = Starlette(routes=[Route("/", home)])
INFO:     127.0.0.1:38388 - "GET / HTTP/1.1" 200 OK
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/home/jordan/src/starlette/venv/lib/python3.7/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/home/jordan/src/starlette/venv/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File "./starlette/applications.py", line 120, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/jordan/src/starlette/venv/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 528, in __aexit__
    raise exceptions[0]
  File "/home/jordan/src/starlette/venv/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 559, in _run_wrapped_task
    await coro
  File "./testapp.py", line 6, in crash
    raise Exception()
Exception

Using hypercorn, it crashes and never responds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uSpike I think this task_group should be scoped to the dispatch, and not attached to the app: uSpike#1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graingert yep that makes sense to me, I'll get that in today.

async with task_group:
await self.middleware_stack(scope, receive, send)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something we need to be aware of here, that's kinda silly, but also is at least worth pointing out...

This is on the critical path, so it'll have some impact on the benchmarking.

Now, I'm acutely aware that the micro-benchmarking is a big ol' pile of nonsense. But also it does, tediously, lead some folks into particular evaluations.

When I started Starlette one of the design arguments that I felt needed to be knocked down was that integrated server+application frameworks would be necessarily "faster" than frameworks with a clean server / application interface split. In this case ASGI.

To that extent it was important that Starlette ought to be able to equal sanic or aiohttp when evaluated with microbenchmarks, however tedious that might be.

Adoptiong this would likely change that.

Which might well be okay, but it's something we need to consider.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can provide some benchmarks for this conversation. Are there any that you recommend as meaningful?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomchristie I created a branch here: https://github.com/uSpike/FrameworkBenchmarks/tree/starlette-anyio

See results: https://www.techempower.com/benchmarks/#section=test&shareid=c92a1338-0058-486c-9e47-c92ec4710b6a&hw=ph&test=query&a=2

There is a performance penalty to using anyio. The weighted composite score is 173 vs 165, so a 4.5% drop in performance.

I'll add this result to the top comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, the multiple queries test was ~25% faster with anyio.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only see starlette and starlette-anyio. How do I run these benchmarks locally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agronholm if you click the other test tabs they should show up.

You can run locally with

./tfb --test starlette starlette-anyio sanic aiohttp

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to get reproducible results on my local machine. I ran it again now with @graingert changes to make task groups per-request, and now starlette-anyio is much closer to starlette performance: https://www.techempower.com/benchmarks/#section=test&shareid=d1edb7b5-09f0-43c0-a4cb-cca51f87801c

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task groups were per request before, they were just attached to the app


# The following usages are now discouraged in favour of configuration
#  during Starlette.__init__(...)
Expand Down
31 changes: 17 additions & 14 deletions starlette/concurrency.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,46 @@
import asyncio
import functools
import sys
import typing
from typing import Any, AsyncGenerator, Iterator

import anyio

try:
import contextvars # Python 3.7+ only or via contextvars backport.
except ImportError: # pragma: no cover
contextvars = None # type: ignore

if sys.version_info >= (3, 7): # pragma: no cover
from asyncio import create_task
else: # pragma: no cover
from asyncio import ensure_future as create_task

T = typing.TypeVar("T")


async def run_until_first_complete(*args: typing.Tuple[typing.Callable, dict]) -> None:
JayH5 marked this conversation as resolved.
Show resolved Hide resolved
tasks = [create_task(handler(**kwargs)) for handler, kwargs in args]
(done, pending) = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
[task.cancel() for task in pending]
[task.result() for task in done]
result: Any = None
async with anyio.create_task_group() as task_group:

async def task(_handler: typing.Callable, _kwargs: dict) -> Any:
nonlocal result
result = await _handler(**_kwargs)
await task_group.cancel_scope.cancel()
uSpike marked this conversation as resolved.
Show resolved Hide resolved

for handler, kwargs in args:
await task_group.spawn(task, handler, kwargs)
uSpike marked this conversation as resolved.
Show resolved Hide resolved

return result


async def run_in_threadpool(
func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any
) -> T:
loop = asyncio.get_event_loop()
if contextvars is not None: # pragma: no cover
# Ensure we run in the same context
child = functools.partial(func, *args, **kwargs)
context = contextvars.copy_context()
func = context.run
args = (child,)
elif kwargs: # pragma: no cover
# loop.run_in_executor doesn't accept 'kwargs', so bind them in here
# run_sync doesn't accept 'kwargs', so bind them in here
func = functools.partial(func, **kwargs)
return await loop.run_in_executor(None, func, *args)
return await anyio.run_sync_in_worker_thread(func, *args)


class _StopIteration(Exception):
Expand All @@ -57,6 +60,6 @@ def _next(iterator: Iterator) -> Any:
async def iterate_in_threadpool(iterator: Iterator) -> AsyncGenerator:
while True:
try:
yield await run_in_threadpool(_next, iterator)
yield await anyio.run_sync_in_worker_thread(_next, iterator)
except _StopIteration:
break
39 changes: 17 additions & 22 deletions starlette/middleware/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import typing

import anyio

from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.types import ASGIApp, Message, Receive, Scope, Send
from starlette.types import ASGIApp, Receive, Scope, Send

RequestResponseEndpoint = typing.Callable[[Request], typing.Awaitable[Response]]
DispatchFunction = typing.Callable[
Expand All @@ -26,34 +27,28 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
await response(scope, receive, send)

async def call_next(self, request: Request) -> Response:
loop = asyncio.get_event_loop()
queue: "asyncio.Queue[typing.Optional[Message]]" = asyncio.Queue()

send_stream, recv_stream = anyio.create_memory_object_stream()
scope = request.scope
receive = request.receive
send = queue.put
task_group = scope["task_group"]

async def coro() -> None:
try:
await self.app(scope, receive, send)
finally:
await queue.put(None)
async with send_stream:
await self.app(scope, recv_stream.receive, send_stream.send)

task = loop.create_task(coro())
message = await queue.get()
if message is None:
task.result()
await task_group.spawn(coro)
uSpike marked this conversation as resolved.
Show resolved Hide resolved

try:
message = await recv_stream.receive()
except anyio.EndOfStream:
raise RuntimeError("No response returned.")

assert message["type"] == "http.response.start"

async def body_stream() -> typing.AsyncGenerator[bytes, None]:
while True:
message = await queue.get()
if message is None:
break
assert message["type"] == "http.response.body"
yield message.get("body", b"")
task.result()
async with recv_stream:
async for message in recv_stream:
assert message["type"] == "http.response.body"
yield message.get("body", b"")

response = StreamingResponse(
status_code=message["status"], content=body_stream()
Expand Down
65 changes: 29 additions & 36 deletions starlette/middleware/wsgi.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
import io
import sys
import typing

from starlette.concurrency import run_in_threadpool
from starlette.types import Message, Receive, Scope, Send
import anyio

from starlette.types import Receive, Scope, Send


def build_environ(scope: Scope, body: bytes) -> dict:
Expand Down Expand Up @@ -69,9 +69,7 @@ def __init__(self, app: typing.Callable, scope: Scope) -> None:
self.scope = scope
self.status = None
self.response_headers = None
self.send_event = asyncio.Event()
self.send_queue = [] # type: typing.List[typing.Optional[Message]]
self.loop = asyncio.get_event_loop()
self.stream_send, self.stream_receive = anyio.create_memory_object_stream()
self.response_started = False
self.exc_info = None # type: typing.Any

Expand All @@ -83,31 +81,25 @@ async def __call__(self, receive: Receive, send: Send) -> None:
body += message.get("body", b"")
more_body = message.get("more_body", False)
environ = build_environ(self.scope, body)
sender = None
try:
sender = self.loop.create_task(self.sender(send))
await run_in_threadpool(self.wsgi, environ, self.start_response)
self.send_queue.append(None)
self.send_event.set()
await asyncio.wait_for(sender, None)
if self.exc_info is not None:
raise self.exc_info[0].with_traceback(
self.exc_info[1], self.exc_info[2]
)
finally:
if sender and not sender.done():
sender.cancel() # pragma: no cover

async with anyio.create_task_group() as task_group:
try:
await task_group.spawn(self.sender, send)
uSpike marked this conversation as resolved.
Show resolved Hide resolved
async with self.stream_send:
await anyio.run_sync_in_worker_thread(
self.wsgi, environ, self.start_response
)
if self.exc_info is not None:
raise self.exc_info[0].with_traceback(
self.exc_info[1], self.exc_info[2]
)
finally:
await task_group.cancel_scope.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await task_group.cancel_scope.cancel()
task_group.cancel_scope.cancel()


async def sender(self, send: Send) -> None:
while True:
if self.send_queue:
message = self.send_queue.pop(0)
if message is None:
return
async with self.stream_receive:
async for message in self.stream_receive:
await send(message)
else:
await self.send_event.wait()
self.send_event.clear()

def start_response(
self,
Expand All @@ -124,21 +116,22 @@ def start_response(
(name.strip().encode("ascii").lower(), value.strip().encode("ascii"))
for name, value in response_headers
]
self.send_queue.append(
anyio.run_async_from_thread(
self.stream_send.send,
{
"type": "http.response.start",
"status": status_code,
"headers": headers,
}
},
)
self.loop.call_soon_threadsafe(self.send_event.set)

def wsgi(self, environ: dict, start_response: typing.Callable) -> None:
for chunk in self.app(environ, start_response):
self.send_queue.append(
{"type": "http.response.body", "body": chunk, "more_body": True}
anyio.run_async_from_thread(
self.stream_send.send,
{"type": "http.response.body", "body": chunk, "more_body": True},
)
self.loop.call_soon_threadsafe(self.send_event.set)

self.send_queue.append({"type": "http.response.body", "body": b""})
self.loop.call_soon_threadsafe(self.send_event.set)
anyio.run_async_from_thread(
self.stream_send.send, {"type": "http.response.body", "body": b""}
)
12 changes: 7 additions & 5 deletions starlette/requests.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import json
import typing
from collections.abc import Mapping
from http import cookies as http_cookies

import anyio

from starlette.datastructures import URL, Address, FormData, Headers, QueryParams, State
from starlette.formparsers import FormParser, MultiPartParser
from starlette.types import Message, Receive, Scope, Send
Expand Down Expand Up @@ -251,10 +252,11 @@ async def close(self) -> None:

async def is_disconnected(self) -> bool:
if not self._is_disconnected:
try:
message = await asyncio.wait_for(self._receive(), timeout=0.0000001)
except asyncio.TimeoutError:
message = {}
message: Message = {}
async with anyio.move_on_after(
uSpike marked this conversation as resolved.
Show resolved Hide resolved
0.001
): # XXX: to small of a deadline and this blocks
message = await self._receive()

if message.get("type") == "http.disconnect":
self._is_disconnected = True
Expand Down
Loading