Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions actix-http/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changes

## Unreleased - 2022-xx-xx
### Fixed
- Dropping the payload early and causing unclean connections no longer causes erroneous 500 responses. [#2745]

[#2745]: https://github.com/actix/actix-web/issues/2745


## 3.2.1 - 2022-07-02
Expand Down
181 changes: 146 additions & 35 deletions actix-http/src/h1/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError},
service::HttpFlow,
Error, Extensions, OnConnectData, Request, Response, StatusCode,
ConnectionType, Error, Extensions, OnConnectData, Request, Response, StatusCode,
};

use super::{
Expand Down Expand Up @@ -185,7 +185,9 @@ pin_project! {
None,
ExpectCall { #[pin] fut: X::Future },
ServiceCall { #[pin] fut: S::Future },
SendResponse { res: Option<Response<B>> },
SendPayload { #[pin] body: B },
SendErrorResponse { res: Option<Response<BoxBody>> },
SendErrorPayload { #[pin] body: BoxBody },
}
}
Expand Down Expand Up @@ -216,9 +218,15 @@ where
Self::ServiceCall { .. } => {
f.debug_struct("State::ServiceCall").finish_non_exhaustive()
}
Self::SendResponse { .. } => f
.debug_struct("State::SendResponse")
.finish_non_exhaustive(),
Self::SendPayload { .. } => {
f.debug_struct("State::SendPayload").finish_non_exhaustive()
}
Self::SendErrorResponse { .. } => f
.debug_struct("State::SendErrorResponse")
.finish_non_exhaustive(),
Self::SendErrorPayload { .. } => f
.debug_struct("State::SendErrorPayload")
.finish_non_exhaustive(),
Expand Down Expand Up @@ -379,11 +387,8 @@ where
Ok(size)
}

fn send_response(
mut self: Pin<&mut Self>,
res: Response<()>,
body: B,
) -> Result<(), DispatchError> {
fn send_response(mut self: Pin<&mut Self>, res: Response<B>) -> Result<(), DispatchError> {
let (res, body) = res.replace_body(());
let size = self.as_mut().send_response_inner(res, &body)?;
let mut this = self.project();
this.state.set(match size {
Expand All @@ -397,11 +402,17 @@ where
Ok(())
}

fn queue_response(self: Pin<&mut Self>, res: Response<B>) {
self.project()
.state
.set(State::SendResponse { res: Some(res) });
}

fn send_error_response(
mut self: Pin<&mut Self>,
res: Response<()>,
body: BoxBody,
res: Response<BoxBody>,
) -> Result<(), DispatchError> {
let (res, body) = res.replace_body(());
let size = self.as_mut().send_response_inner(res, &body)?;
let mut this = self.project();
this.state.set(match size {
Expand All @@ -415,6 +426,12 @@ where
Ok(())
}

fn queue_error_response(self: Pin<&mut Self>, res: Response<BoxBody>) {
self.project()
.state
.set(State::SendErrorResponse { res: Some(res) });
}

fn send_continue(self: Pin<&mut Self>) {
self.project()
.write_buf
Expand Down Expand Up @@ -449,7 +466,8 @@ where
// send_response would update InnerDispatcher state to SendPayload or None
// (If response body is empty)
// continue loop to poll it
self.as_mut().send_error_response(res, BoxBody::new(()))?;
self.as_mut()
.queue_error_response(res.set_body(BoxBody::new(())));
}

// return with upgrade request and poll it exclusively
Expand All @@ -470,30 +488,45 @@ where
match fut.poll(cx) {
// service call resolved. send response.
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
self.as_mut().send_response(res, body)?;
self.as_mut().queue_response(res.into());
}

// send service call error as response
Poll::Ready(Err(err)) => {
let res: Response<BoxBody> = err.into();
let (res, body) = res.replace_body(());
self.as_mut().send_error_response(res, body)?;
self.as_mut().queue_error_response(err.into());
}

// service call pending and could be waiting for more chunk messages
// (pipeline message limit and/or payload can_read limit)
Poll::Pending => {
// no new message is decoded and no new payload is fed
// nothing to do except waiting for new incoming data from client
if !self.as_mut().poll_request(cx)? {
return Ok(PollResponse::DoNothing);
}

// optimisation disabled so that poll_request is called from only one place
// if !self.as_mut().poll_request(cx)? {
return Ok(PollResponse::DoNothing);
// }

// else loop
}
}
}

StateProj::SendResponse { res } => {
let mut res = res.take().expect("response should be take-able");

if this.flags.contains(Flags::SHUTDOWN) {
trace!("shutdown flag set; assuming dirty read I/O");
// shutdown flags occur when read I/O is not clean so connections should be
// closed to avoid stuck or erroneous errors on next request
res.head_mut().set_connection_type(ConnectionType::Close);
}

self.send_response(res)?;

return Ok(PollResponse::DrainWriteBuf);
}

StateProj::SendPayload { mut body } => {
// keep populate writer buffer until buffer size limit hit,
// get blocked or finished.
Expand Down Expand Up @@ -529,6 +562,23 @@ where
return Ok(PollResponse::DrainWriteBuf);
}

StateProj::SendErrorResponse { res } => {
// TODO: de-dupe impl with SendResponse

let mut res = res.take().expect("response should be take-able");

if this.flags.contains(Flags::SHUTDOWN) {
trace!("shutdown flag set; assuming dirty read I/O");
// shutdown flags occur when read I/O is not clean so connections should be
// closed to avoid stuck or erroneous errors on next request
res.head_mut().set_connection_type(ConnectionType::Close);
}

self.send_error_response(res)?;

return Ok(PollResponse::DrainWriteBuf);
}

StateProj::SendErrorPayload { mut body } => {
// TODO: de-dupe impl with SendPayload

Expand Down Expand Up @@ -583,9 +633,7 @@ where

// send expect error as response
Poll::Ready(Err(err)) => {
let res: Response<BoxBody> = err.into();
let (res, body) = res.replace_body(());
self.as_mut().send_error_response(res, body)?;
self.as_mut().queue_error_response(err.into());
}

// expect must be solved before progress can be made.
Expand Down Expand Up @@ -637,9 +685,8 @@ where
// on success to notify the dispatcher a new state is set and the outer loop
// should be continued
Poll::Ready(Err(err)) => {
let res: Response<BoxBody> = err.into();
let (res, body) = res.replace_body(());
return self.send_error_response(res, body);
self.queue_error_response(err.into());
return Ok(());
}

// future is pending; return Ok(()) to notify that a new state is
Expand All @@ -655,18 +702,17 @@ where
// to notify the dispatcher a new state is set and the outer loop
// should be continue.
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
self.as_mut().send_response(res, body)
self.as_mut().queue_response(res.into());
Ok(())
}

// see the comment on ExpectCall state branch's Pending
Poll::Pending => Ok(()),

// see the comment on ExpectCall state branch's Ready(Err(_))
Poll::Ready(Err(err)) => {
let res: Response<BoxBody> = err.into();
let (res, body) = res.replace_body(());
self.as_mut().send_error_response(res, body)
self.as_mut().queue_error_response(err.into());
Ok(())
}
};
}
Expand All @@ -688,15 +734,13 @@ where
cx: &mut Context<'_>,
) -> Result<bool, DispatchError> {
let pipeline_queue_full = self.messages.len() >= MAX_PIPELINED_MESSAGES;
let can_not_read = !self.can_read(cx);

// limit amount of non-processed requests
if pipeline_queue_full || can_not_read {
if pipeline_queue_full {
return Ok(false);
}

let mut this = self.as_mut().project();

let mut updated = false;

// decode from read buf as many full requests as possible
Expand Down Expand Up @@ -829,6 +873,72 @@ where
}
}

let can_read = self.can_read(cx);
let mut this = self.as_mut().project();

if !can_read {
// request payload is not readable...
tracing::debug!("cannot read request payload");

if let Some(sender) = &this.payload {
// ...maybe handler does not want to read any more payload...
if let PayloadStatus::Dropped = sender.need_read(cx) {
tracing::debug!(
"handler dropped payload early; attempt to clean connection"
);

// ...in which case poll request payload a few times
loop {
match this.codec.decode(this.read_buf)? {
Some(msg) => {
match msg {
// payload decoded did not yield EOF yet
Message::Chunk(Some(_)) => {
// if non-clean connection, next loop iter will detect empty
// read buffer and close connection
}

// connection is in clean state for next request
Message::Chunk(None) => {
tracing::debug!("connection successfully cleaned");

// reset dispatcher state
let _ = this.payload.take();
this.state.set(State::None);

// break out of payload decode loop
break;
}

// Either whole payload is read and loop is broken or more data
// was expected in which case connection is closed. In both
// situations dispatcher cannot get here.
Message::Item(_) => {
unreachable!("dispatcher is in payload receive state")
}
}
}

// not enough info to decide if connection is going to be clean or not
None => {
tracing::debug!(
"handler did not read whole payload and dispatcher could not \
drain read buf; close connection"
);

this.flags.insert(Flags::SHUTDOWN);

return Ok(updated);
}
}
}
}
} else {
// can_not_read and no request payload
return Ok(false);
}
}

Ok(updated)
}

Expand All @@ -844,10 +954,10 @@ where

trace!("timed out on slow request; replying with 408 and closing connection");

let _ = self.as_mut().send_error_response(
Response::with_body(StatusCode::REQUEST_TIMEOUT, ()),
BoxBody::new(()),
);
let mut res =
Response::with_body(StatusCode::REQUEST_TIMEOUT, BoxBody::new(()));
res.head_mut().set_connection_type(ConnectionType::Close);
self.as_mut().send_error_response(res)?;

self.project().flags.insert(Flags::SHUTDOWN);
}
Expand Down Expand Up @@ -1123,6 +1233,7 @@ where
}
}

// process request(s) and queue response
inner.as_mut().poll_request(cx)?;

if should_disconnect {
Expand Down
Loading