Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse Request's body buffer for call_next in BaseHTTPMiddleware #1692

Merged
merged 10 commits into from
Jun 1, 2023

Conversation

adriangb
Copy link
Member

@adriangb adriangb commented Jun 15, 2022

Thought of this while working on related work to #1691. Sorry if this has already been proposed, I know we've gone around a couple times with similar stuff.

Background:

Basically if the user calls Request.body() from their dispatch function we cache the entire request body in memory and pass that to downstream middlewares but if they call Request.stream() then all we do is send an empty body so that downstream things don't hang forever.

I think this behavior is pretty sensible and doesn't use any unexpected memory (e.g. caching all of the body if Request.stream() was called). It also doesn't break the ASGI flow: if a downstream middleware replaces receive() or modified messages the downstream ASGI app (including the final endpoint) will see the new receive().

If this approach works well we could upstream this into the base Request so that arbitrary uses of Request can inter operate with ASGI apps/call chains.

Note that this does not fix #493 (comment) or any other case where the body is consumed in the endpoint and then Request.body() is called somewhere else (e.g. in BaseHTTPMiddleware after call_next() or in an exception handler).

@adriangb
Copy link
Member Author

adriangb commented Sep 4, 2022

It also doesn't break the ASGI flow: if a downstream middleware replaces receive() or modified messages the downstream ASGI app (including the final endpoint) will see the new receive().

TODO: add a test to verify this.

Other misc TODO:

  • handle client disconnects (the test for this is fugly)

starlette/middleware/base.py Outdated Show resolved Hide resolved
Comment on lines 197 to 203
self._stream_consumed = False
self._is_disconnected = False
self._stream_state = self._StreamState.connected
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the state of "stream not consumed but disconnected" unrepresentable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear: this PR can be implemented without this change, but I think it cleans things up nicely and makes the new code portion of this PR simpler.

starlette/requests.py Outdated Show resolved Hide resolved
@adriangb adriangb changed the title buffer request stream in BaseHTTPMiddleware Reuse Request's body buffer for downstream ASGI apps Sep 6, 2022
@adriangb adriangb changed the title Reuse Request's body buffer for downstream ASGI apps Reuse Request's body buffer for call_next in BaseHTTPMiddleware Sep 6, 2022
Copy link
Member

@Kludex Kludex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this means exactly:

If this approach works well we could upstream this into the base Request so that arbitrary uses of Request can inter operate with ASGI apps/call chains.

You mean that if we create a Request object inside a middleware/ASGI app, we can't get the body again if we use the same middleware twice, for example?

starlette/requests.py Outdated Show resolved Hide resolved
starlette/requests.py Outdated Show resolved Hide resolved
@adriangb
Copy link
Member Author

adriangb commented Sep 9, 2022

What does this means exactly:

If this approach works well we could upstream this into the base Request so that arbitrary uses of Request can inter operate with ASGI apps/call chains.

You mean that if we create a Request object inside a middleware/ASGI app, we can't get the body again if we use the same middleware twice, for example?

What I mean here is that if we moved this implementation into the base Requests class it would allow folks not using BaseHTTPMiddleware but using Request (https://github.com/encode/starlette/blob/master/docs/middleware.md#reusing-starlette-components) to read bodies without breaking the downstream ASGI app as long as they do receive = request.wrapped_receive.

Comment on lines 22 to 42
if getattr(self, "_body", None) is not None:
# body() was called
self._wrapped_rcv_state = self._StreamState.consumed
return {
"type": "http.request",
"body": self._body,
"more_body": False,
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that there is no additional buffering going on here: if the user calls request.body() on this Request instance within dispatch we would already be keeping around all of the bytes for the duration of the dispatch() call. We are just re-using that buffering for the downstream ASGI app. If the user never called request.body() then we consume the body here, just like if the user had called request.stream().

Copy link

@larsagny larsagny Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear @adriangb : if I call request.body() before call_next I'll be fine, right? Any endpoint that calls request.body will get the cached data?

class CustomMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        my_body = await request.body()
        response = await call_next(request)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's the idea. And if you call it after things won't hang, you'll get an empty request body (if I remember correctly)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for getting back to me to quickly. The request doesn't hang anymore indeed, but you'll get a RuntimeError

    if await request.body():
  File "/home/basicuser/.local/lib/python3.10/site-packages/starlette/requests.py", line 236, in body
    async for chunk in self.stream():
  File "/home/basicuser/.local/lib/python3.10/site-packages/starlette/requests.py", line 218, in stream
    raise RuntimeError("Stream consumed")
RuntimeError: Stream consumed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, you consumed the stream in the request body so it's not available in the middleware.

Comment on lines 197 to 203
self._stream_consumed = False
self._is_disconnected = False
self._stream_state = self._StreamState.connected
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear: this PR can be implemented without this change, but I think it cleans things up nicely and makes the new code portion of this PR simpler.

starlette/requests.py Outdated Show resolved Hide resolved
starlette/requests.py Outdated Show resolved Hide resolved
@adriangb
Copy link
Member Author

adriangb commented Oct 3, 2022

Note that this does not fix #493 (comment) or any other case where the body is consumed in the endpoint and then Request.body() is called somewhere else (e.g. in BaseHTTPMiddleware after call_next() or in an exception handler). These seem like finicky use cases, I don't think there's any general way to maintain full ASGI compatibility and supporting these use cases since they essentially require information to be transmitted "upstream". We could mitigate the "hang forever" behavior by detecting stream double consumption and erroring out, but that's about it.

@adriangb
Copy link
Member Author

adriangb commented Oct 4, 2022

I tried to figure out a way to get use cases like #493 (comment) working but failed. Here's a comparison of before and after for this change:

Request.body called first in Endpoint Middleware Error handler
Endpoint N/A
Middleware N/A
Error handler
Request.body called first in Endpoint Middleware Error handler
Endpoint N/A
Middleware N/A
Error handler

So this only really fixes the case that could already be fixed by moving to pure ASGI middleware, as discussed in #1519 (comment). The use cases this doesn't fix (reading the request body for the second time in an error handler or a middleware) are also not use cases you can get around using pure ASGI middleware.

So the question is: do we move forward with this to fix one specific use case, or do we leave it as -is?

@ZhymabekRoman
Copy link

Thank you all for developing and contributing to this library. Today I want to make a logging for the back-end app, and I'm stuck on it. Is there any change in the status of this PR? I've seen this PR change and I think this implementation is too confusing, we have to look at this issue differently. Sorry if I don't express myself that way, get it right, I have no doubt about your professionalism, but the Middleware codebase requires a refactor as to me

@adriangb
Copy link
Member Author

Thanks for the feedback @ZhymabekRoman.

I've seen this PR change and I think this implementation is too confusing, we have to look at this issue differently.

Is it confusing to you conceptually or the code itself?

but the Middleware codebase requires a refactor as to me

I don't get what you mean by this. Are you referring to BaseHTTPMiddleware? To all middleware?

@adriangb
Copy link
Member Author

We could also detect stream double consumption here and prohibit calling stream() and body() after call_next() is called if the stream is consumed. That way there’s no error/hanging. With that I think we will have pretty much solved all of BaseHTTPMiddleware’s problems?

@Kludex
Copy link
Member

Kludex commented Feb 10, 2023

We could also detect stream double consumption here and prohibit calling stream() and body() after call_next() is called if the stream is consumed. That way there’s no error/hanging. With that I think we will have pretty much solved all of BaseHTTPMiddleware’s problems?

Besides the ContextVar?

@adriangb
Copy link
Member Author

Yep

@adriangb adriangb force-pushed the cache-stream-in-basehttpmiddleware branch from 0fa5897 to 2048c44 Compare February 13, 2023 20:43
@adriangb adriangb requested a review from Kludex February 13, 2023 20:43
@Kludex Kludex mentioned this pull request Feb 14, 2023
8 tasks
@Kludex Kludex mentioned this pull request Mar 16, 2023
11 tasks
@adriangb
Copy link
Member Author

@tomchristie are there any other changes you'd like, or anything I can do to help get this reviewed?

@adriangb adriangb force-pushed the cache-stream-in-basehttpmiddleware branch 4 times, most recently from 1455e5c to 6ff01c0 Compare April 29, 2023 19:03
@adriangb adriangb force-pushed the cache-stream-in-basehttpmiddleware branch from 9f85a52 to 4ce3d73 Compare April 29, 2023 19:06
@adriangb adriangb force-pushed the cache-stream-in-basehttpmiddleware branch from 4ce3d73 to ec38227 Compare April 29, 2023 19:06
starlette/middleware/base.py Outdated Show resolved Hide resolved
@@ -26,6 +88,8 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
await self.app(scope, receive, send)
return

request = _CachedRequest(scope, receive)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be here, does it? Can it be in the same place it was instantiated before, or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next line references this. It, maybe, can be moved to where it was before but at the very least it will need a new variable name likeouter_request to differentiate it from the request: Request on line 95. It makes more sense to just move it up here, there is no harm in that.

@@ -223,7 +223,7 @@ async def stream(self) -> typing.AsyncGenerator[bytes, None]:
body = message.get("body", b"")
if body:
yield body
if not message.get("more_body", False):
if self._stream_consumed:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change is needed? The _CachedRequest doesn't change the value of self._stream_consumed. 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test test_read_request_stream_in_dispatch_after_app_calls_body fails without this logic.

Hmm... Why the more_body doesn't matter? Like, not considering the BaseHTTPMiddleware, why the more_body doesn't matter to exit?

Hmmm... If we receive 2 chunks of body, how this works? It doesn't look like we have a test that covers standalone Request with multiple chunks. 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really should have tests for Request as a standalone thing since it is a standalone thing in the public API and... I've been encouraging people to use it e.g. in ASGI middleware.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do, we just don't cover what I mention

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it. If you already prototyped it out in your head or on paper please comment it here and save me a few min haha.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall how to do it from the TestClient's POV, but I thought about sending a stream with 2 chunks. Maybe you can use httpx directly if you can't do it with the TestClient.

I guess that would be enough to break this logic here, since the value of stream_consumed will not change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok yes, you were right, I did have a bug, good catch. I still need to modify Request a bit, I added a couple of tests to explain why. TLDR is we were marking the stream as consumed as soon as you call stream() but in reality you can call stream, get one message and then call steam again before it is consumed. Let me know if it's clear now.

starlette/middleware/base.py Outdated Show resolved Hide resolved
starlette/middleware/base.py Outdated Show resolved Hide resolved
@Kludex
Copy link
Member

Kludex commented May 4, 2023

Sorry the time it took to review this.

@adriangb adriangb force-pushed the cache-stream-in-basehttpmiddleware branch from c857977 to 96d5b49 Compare May 4, 2023 14:24
@adriangb
Copy link
Member Author

adriangb commented May 4, 2023

@Kludex thank you for the review. No worries with the timeline, the only issue becomes that I start forgetting why I did things like I did so sorry if answers to some of your questions weren't super clear.

I fixed the bug you found and reworked things to try to make the answers to your questions clearer by just looking at the code. I also added more tests, now there are 400+ lines of tests for 70 lines of implementation 😅 .

Copy link
Member

@Kludex Kludex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long wait here.

Let's go 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants