-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
A Stream object in a pending state won't be dropped after the client disconnect #1313
Comments
In my understanding (still new with Rust), if you return pending, your future/stream will never be scheduled again if not awoken by any trigger. See: I think this also will prevent closing down the stream. I had a similar issue and never return pending from the stream, if not waiting on another future/stream. |
I've run this code and it does print out multiple "PendingStream: Pending" messages after one request so it seems it is getting awoken. |
As @robjtede wrote, One of the possible causes is that
In my understanding, it depends on how the streaming function is implemented. For example, we might be able to implement it like this: // Below is a pseudo-code
struct StreamingResponse<SO, ST> {
socket: SO,
stream: ST,
}
impl<SO, ST> Future for StreamingResponse<SO, ST>
where
SO: AsyncWrite,
ST: Stream,
{
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// Implement the following sequence with a state machine:
//
// (1) polling self.socket.poll_write() with an empty buffer
// in order to detect the TCP disconnection.
// (2) polling self.stream.poll_next() in order to receive data
// (3) polling self.socket.poll_write() in order to write the received data
//
// I'm not sure whether (1) is possible...
} |
@robjtede ok thanks for clarifying, I had no time to test the code, I just had a similar issue and the documentation says
What I really do not understand here is, why the thread executor calls poll_next() ever again after it returned a Pending (independent from the clients disconnect). So I'm out for now, first need to investigate how the executor really works :-) |
Prepared a reproduction environment: It supports VS Code Remote Containers. |
Investigated about the periodic The root cause of the periodic We can disable it by setting |
Investigated how actix-web detects client disconnect. When the client disconnects, When the We may detect the client disconnect quickly if we can check it like below: impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
{
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
...
loop {
...
if inner.check_disconnect() {
// client disconnected
return Err(...);
}
if inner.poll_flush(cx)? || !drain {
break;
}
}
}
} |
I concluded that it's difficult to detect client disconnects while streams are pending. tokio's TcpStream seems to be based on Rust's TcpStream which has a traditional socket interface. TcpStream's read/write function call with an empty buffer returns Issue#29 in the tokio project is a similar topic. I checked behavior of Read/Write with non-empty buffer can detect the client disconnect. But that breaks the HTTP request pipelining and the HTTP response. There might be a platform specific workaround, but there is no useful functions in crates that actix-web uses at this moment. Finally, I decided to feed some data at short intervals from my stream implementation in order to detect client disconnect quickly. |
@robjtede You can close this issue if you don't have any additional investigation. |
I haven't investigated further. Great work finding out. It's comforting that the Keep-Alive checks were the cause of the polls. Are we considering this correct behavior? |
If actix-web writes response data from a stream object for a request that a client disconnected, I think that it's incorrect behavior. Because the response data will be garbage for subsequent pipelined requests on the keep-alive socket. I haven't confirmed, but actix-web might write garbage response data... If actix-web wakes up tasks which process I/O on non-keep-alive sockets, it's not correct behavior. My test program cannot test that. So, we need to make another test program for that. |
In an old version of
This function was deleted at tokio-rs/tokio@6d8cc4e. Probably, we can see the reason for the deletion in a discussion on gitter: tokio-rs/tokio#1392 (comment) |
Posted a question about this issue: |
Thanks for all the investigation. I've run into it as well and ideally the solution would be similar to what Go does, ie. wait for closing a read stream (ie. using |
I've created tokio-rs/tokio#2743 which restores |
It's encouraging they have acknowledged that issue and PR but I'd rather not wait for a Tokio change since this is the last issue on the v3 blocker list. Can we say that for now perpetually pending streams are not allowed and remove this as a v3 blocker? It can be fixed a little later. What do you think @masnagam ? |
@robjtede Yes, you can. That's acceptable at least to me. |
Note that the relevant Tokio PR has been merged and methods are available. https://docs.rs/tokio/1.0.1/tokio/net/struct.TcpStream.html#method.poll_read_ready |
The h1 dispatcher calls on your stream and give you the waker. But anyway what you want to look into is |
As a caution to anyone still using Actix v3, this problem may come up if you using Websockets. I had a very nasty bug come up where my server has a cap on the number of connections for a particular endpoint and they were being exhausted. I checked my #[get("/the-endpoint")]
async fn endpoint(
connection_tracker: web::Data<ConnectionTracker>,
http_request: HttpRequest,
mut http_stream: web::Payload,
)
-> Result<HttpResponse, actix_web::Error>
{
/* Websocket and connection setup boilerplate */
// THE INTERESTING BIT
tokio::task::spawn_local(async move {
debug!("Stream reader opening");
let mut codec = actix_http::ws::Codec::new();
let mut buf = BytesMut::with_capacity(65536);
while let Some(chunk_or_err) = http_stream.next().await {
let chunk = match chunk_or_err {
Ok(c) => c,
Err(_) => break,
};
buf.put(chunk);
match codec.decode(&mut buf) {
Ok(maybe_frame) => {
if let Some(frame) = maybe_frame {
if let Err(_) = incoming_mesgs_tx.send(Ok(frame)).await {
break;
}
}
},
Err(protocol_err) => {
if let Err(_) = incoming_mesgs_tx.send(Err(protocol_err)).await {
break;
}
},
}
}
// I noticed these were not showing up consistently in the logs
// Ergo the stream was never returning None
debug!("Stream reader closing");
});
/* More response and setup boilerplate */
} CaveatsI am using Websockets to bridge into a Tokio application so I'm not using the WorkaroundWith Websockets, I found this is fairly easy to work around, albeit imperfectly. After reading this thread, it figured I just need to get the server to trigger a socket read or write semi-regularly in case a client disconnects abnormally. I set up the server to emit Websocket pings once a second for each client and this has eliminated the problem. Probably just good practice to do this anyways 😄 |
@masnagam , @fakeshadow , many thanks for the detailed investigation. Finding this thread in 2023, I am at least able to understand the problem. However, it looks like this is still an issue in the latest version of My application creates an HTTP response with a streaming body that may not produce any data for a long time; however I still want to be able to drop the stream when the client disconnects. It appears that @fakeshadow had a fix, but the PR was never merged. @masnagam, did you manage to solve this or perhaps you have any ideas? |
Sorry I don't work on actix anymore. It would thankful to not ping me in future conversation. |
@ryzhyk There might be some workaround but we simply switched to |
There is a bug in actix (actix/actix-web#1313) that prevents it from dropping HTTP connections on client disconnect unless the endpoint periodically sends some data. As a result, all API-based data transfers eventually start to fail in the UI. As a workaround, we generate an empty output chunk if there is no real payload to send for more than 3 seconds. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
There is a bug in actix (actix/actix-web#1313) that prevents it from dropping HTTP connections on client disconnect unless the endpoint periodically sends some data. As a result, all API-based data transfers eventually start to fail in the UI. As a workaround, we generate an empty output chunk if there is no real payload to send for more than 3 seconds. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
This seems like a pretty serious issue in actix-web, making it hard to use in scenarios that require streaming data continuously over HTTP (an increasingly popular thing to do nowadays). @robjtede , based on this thread, when this issue was originally created three years ago there really was not a good way to fix it. Do you think things have changed since? Do you see a solution to this? I notice there is another PR by @simorex80 #2830 that claims to fix this bug. @simorex80 and @robjtede , do you believe the PR solves the issue? Is it just a matter of adding a test for it? If so, I can look into writing such a test. Again, this seems like a pretty significant gap in the otherwise excellent framework. I'd love to see it fixed, so that users like myself don't need to resort to ugly workarounds. |
As it happens, I'm looking to clean up the remaining issue for v4.4 release, including this one, in the coming few days. I'll be assessing the open PRs that claim to solve this issue. Hopefully they include good test cases 🤞🏻. |
Awesome, really looking forward to it. And happy to help with testing. |
Hi @ryzhyk , I can confirm that we are using the fix #2830 in production from more than one year. (overriding cargo source definition) How you can reproduce the issue:
This is the scenario we faced and I fixed with that commit. |
Thanks for the detailed summary, @simorex80 ! |
@robjtede , just wondering if there's been any progress on this. Thanks! |
I'm not sure if it was intended, but a
Stream
object in a pending state won't be dropped after the client disconnects.I found this issue in
actix-web 2.0.0
and created a small test program like below:https://github.com/masnagam/rust-case-studies/blob/master/actix-web-streaming-and-tcp-disconnect/src/main.rs
When executing the following command:
timeout 3 curl -v http://localhost:3000/pending
we never see
PendingStream: Dropped
in the log messages from the test program.If a stream returns a
Poll::Ready
, it will be dropped shortly. We can see that by executing:timeout 3 curl -v http://localhost:3000/alternate
Is there any workaround to stop polling the pending stream shortly when the client disconnects?
The text was updated successfully, but these errors were encountered: