Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion actix-http/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changes

## Unreleased - 2021-xx-xx
## Unreleased - 2022-xx-xx
### Fixed
- Consume bytes from read buffer when `Payload` is dropped early. [#2764]


## 3.0.4 - 2022-03-09
Expand Down
40 changes: 27 additions & 13 deletions actix-http/src/h1/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError},
service::HttpFlow,
ConnectionType, Error, Extensions, OnConnectData, Request, Response, StatusCode,
Error, Extensions, OnConnectData, Request, Response, StatusCode,
};

use super::{
Expand Down Expand Up @@ -724,7 +724,7 @@ where
this.state.set(State::None);

// break out of payload decode loop
break;
return Ok(true);
}

// Either whole payload is read and loop is broken or more data
Expand All @@ -736,18 +736,18 @@ where
}
}

// not enough info to decide if connection is going to be clean or not
// no bytes in the read buffer, but there are still bytes to be read
// according to the content-length header. The client has stopped
// sending data early. Reset the state, set disconnection flag,
// and stop reading.
None => {
error!(
"handler did not read whole payload and dispatcher could not \
drain read buf; return 500 and close connection"
);

debug!("client stopped sending data; disconnecting");
// reset dispatcher state
this.flags.insert(Flags::SHUTDOWN);
let mut res = Response::internal_server_error().drop_body();
res.head_mut().set_connection_type(ConnectionType::Close);
this.messages.push_back(DispatcherMessage::Error(res));
*this.error = Some(DispatchError::HandlerDroppedPayload);
let _ = this.payload.take();
this.state.set(State::None);

// break out of payload decode loop
return Ok(true);
}
}
Expand Down Expand Up @@ -1010,8 +1010,22 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Result<bool, DispatchError> {
let this = self.project();
let mut this = self.project();
Self::read_available_projected(&mut this, cx)
}

/// Returns true when I/O stream can be disconnected after write to it.
/// Meant to be called when there is already access to a projected
/// `InnerDispatcher` available.
///
/// It covers these conditions:
/// - `std::io::ErrorKind::ConnectionReset` after partial read;
/// - all data read done.
#[inline(always)] // TODO: bench this inline
fn read_available_projected(
this: &mut InnerDispatcherProj<'_, T, S, B, X, U>,
cx: &mut Context<'_>,
) -> Result<bool, DispatchError> {
if this.flags.contains(Flags::READ_DISCONNECT) {
return Ok(false);
};
Expand Down
68 changes: 68 additions & 0 deletions actix-http/src/h1/dispatcher_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,74 @@ async fn upgrade_handling() {
.await;
}

#[actix_rt::test]
async fn handler_drop_large_payload() {
let _ = env_logger::try_init();

const CONTENT_LENGTH: usize = 256 * 1024;
let content = str::from_utf8(&[b'x'; CONTENT_LENGTH]).unwrap();
let buf = TestBuffer::new(http_msg(format!(
r"
POST /drop-payload HTTP/1.1
Content-Length: {}

{}",
CONTENT_LENGTH, content
)));

let services = HttpFlow::new(
drop_payload_service(),
ExpectHandler,
None::<UpgradeHandler>,
);

let h1 = Dispatcher::new(
buf.clone(),
services,
ServiceConfig::default(),
None,
OnConnectData::default(),
);
pin!(h1);

lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_pending());
assert!(h1.as_mut().poll(cx).is_pending());

// polls: manual
assert_eq!(h1.poll_count, 2);

let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];

let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 15
date: Thu, 01 Jan 1970 12:34:56 UTC

payload dropped
",
);

assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);

if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() {
assert!(inner.state.is_none());
}
})
.await;
}

#[actix_rt::test]
async fn handler_drop_payload() {
let _ = env_logger::try_init();
Expand Down