Skip to content

feat(server): add h1 idle_timeout #3743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub(super) enum Kind {
/// User took too long to send headers
#[cfg(all(feature = "http1", feature = "server"))]
HeaderTimeout,
/// User took too long to send another request
#[cfg(all(feature = "http1", feature = "server"))]
IdleTimeout,
/// Error while reading a body from connection.
#[cfg(all(
any(feature = "client", feature = "server"),
Expand Down Expand Up @@ -360,6 +363,11 @@ impl Error {
Error::new(Kind::HeaderTimeout)
}

#[cfg(all(feature = "http1", feature = "server"))]
pub(super) fn new_idle_timeout() -> Error {
Error::new(Kind::IdleTimeout)
}

#[cfg(feature = "http1")]
#[cfg(feature = "server")]
pub(super) fn new_user_unsupported_status_code() -> Error {
Expand Down Expand Up @@ -458,6 +466,8 @@ impl Error {
Kind::Canceled => "operation was canceled",
#[cfg(all(feature = "http1", feature = "server"))]
Kind::HeaderTimeout => "read header from client timeout",
#[cfg(all(feature = "http1", feature = "server"))]
Kind::IdleTimeout => "idle client timeout",
#[cfg(all(
any(feature = "client", feature = "server"),
any(feature = "http1", feature = "http2")
Expand Down
125 changes: 92 additions & 33 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,16 @@ where
#[cfg(feature = "server")]
h1_header_read_timeout: None,
#[cfg(feature = "server")]
h1_header_read_timeout_fut: None,
#[cfg(feature = "server")]
h1_header_read_timeout_running: false,
#[cfg(feature = "server")]
h1_idle_timeout: None,
#[cfg(feature = "server")]
h1_idle_timeout_running: false,
#[cfg(feature = "server")]
h1_timeout_fut: None,
#[cfg(feature = "server")]
first_head: true,
#[cfg(feature = "server")]
date_header: true,
#[cfg(feature = "server")]
timer: Time::Empty,
Expand Down Expand Up @@ -147,6 +153,11 @@ where
self.state.h1_header_read_timeout = Some(val);
}

#[cfg(feature = "server")]
pub(crate) fn set_http1_idle_timeout(&mut self, val: Duration) {
self.state.h1_idle_timeout = Some(val);
}

#[cfg(feature = "server")]
pub(crate) fn set_allow_half_close(&mut self) {
self.state.allow_half_close = true;
Expand Down Expand Up @@ -216,25 +227,8 @@ where
debug_assert!(self.can_read_head());
trace!("Conn::read_head");

#[cfg(feature = "server")]
if !self.state.h1_header_read_timeout_running {
if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
let deadline = Instant::now() + h1_header_read_timeout;
self.state.h1_header_read_timeout_running = true;
match self.state.h1_header_read_timeout_fut {
Some(ref mut h1_header_read_timeout_fut) => {
trace!("resetting h1 header read timeout timer");
self.state.timer.reset(h1_header_read_timeout_fut, deadline);
}
None => {
trace!("setting h1 header read timeout timer");
self.state.h1_header_read_timeout_fut =
Some(self.state.timer.sleep_until(deadline));
}
}
}
}

#[cfg_attr(not(feature = "server"), allow(unused))]
let mut progress = false;
let msg = match self.io.parse::<T>(
cx,
ParseContext {
Expand All @@ -249,20 +243,71 @@ where
#[cfg(feature = "ffi")]
on_informational: &mut self.state.on_informational,
},
&mut progress,
) {
Poll::Ready(Ok(msg)) => msg,
Poll::Ready(Err(e)) => return self.on_read_head_error(e),
Poll::Pending => {
// - Use the read timeout on the first head to avoid common DoS.
// - If made progress in reading header, must no longer be idle.
#[cfg(feature = "server")]
if self.state.h1_header_read_timeout_running {
if let Some(ref mut h1_header_read_timeout_fut) =
self.state.h1_header_read_timeout_fut
{
if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
self.state.h1_header_read_timeout_running = false;

warn!("read header from client timeout");
return Poll::Ready(Some(Err(crate::Error::new_header_timeout())));
if self.state.first_head || progress {
if !self.state.h1_header_read_timeout_running {
if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
debug_assert!(T::is_server());
let deadline = Instant::now() + h1_header_read_timeout;
self.state.h1_idle_timeout_running = false;
self.state.h1_header_read_timeout_running = true;
match self.state.h1_timeout_fut {
Some(ref mut ht_timeout_fut) => {
trace!("resetting h1 timeout timer for header read");
self.state.timer.reset(ht_timeout_fut, deadline);
}
None => {
trace!("setting h1 timeout timer for header read");
self.state.h1_timeout_fut =
Some(self.state.timer.sleep_until(deadline));
}
}
} else if std::mem::take(&mut self.state.h1_idle_timeout_running) {
trace!("unsetting h1 timeout timer for idle");
self.state.h1_timeout_fut = None;
}
}
} else if !self.state.h1_header_read_timeout_running
&& !self.state.h1_idle_timeout_running
{
if let Some(h1_idle_timeout) = self.state.h1_idle_timeout {
debug_assert!(T::is_server());
let deadline = Instant::now() + h1_idle_timeout;
self.state.h1_idle_timeout_running = true;
match self.state.h1_timeout_fut {
Some(ref mut h1_timeout_fut) => {
trace!("resetting h1 timeout timer for idle");
self.state.timer.reset(h1_timeout_fut, deadline);
}
None => {
trace!("setting h1 timeout timer for idle");
self.state.h1_timeout_fut =
Some(self.state.timer.sleep_until(deadline));
}
}
}
}

#[cfg(feature = "server")]
if self.state.h1_header_read_timeout_running || self.state.h1_idle_timeout_running {
if let Some(ref mut h1_timeout_fut) = self.state.h1_timeout_fut {
if Pin::new(h1_timeout_fut).poll(cx).is_ready() {
return Poll::Ready(Some(Err(
if self.state.h1_header_read_timeout_running {
warn!("read header from client timeout");
crate::Error::new_header_timeout()
} else {
warn!("idle client timeout");
crate::Error::new_idle_timeout()
},
)));
}
}
}
Expand All @@ -274,7 +319,9 @@ where
#[cfg(feature = "server")]
{
self.state.h1_header_read_timeout_running = false;
self.state.h1_header_read_timeout_fut = None;
self.state.h1_idle_timeout_running = false;
self.state.h1_timeout_fut = None;
self.state.first_head = false;
}

// Note: don't deconstruct `msg` into local variables, it appears
Expand Down Expand Up @@ -928,10 +975,16 @@ struct State {
#[cfg(feature = "server")]
h1_header_read_timeout: Option<Duration>,
#[cfg(feature = "server")]
h1_header_read_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
#[cfg(feature = "server")]
h1_header_read_timeout_running: bool,
#[cfg(feature = "server")]
h1_idle_timeout: Option<Duration>,
#[cfg(feature = "server")]
h1_idle_timeout_running: bool,
#[cfg(feature = "server")]
h1_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
#[cfg(feature = "server")]
first_head: bool,
#[cfg(feature = "server")]
date_header: bool,
#[cfg(feature = "server")]
timer: Time,
Expand Down Expand Up @@ -1115,6 +1168,12 @@ impl State {
self.reading = Reading::Init;
self.writing = Writing::Init;

#[cfg(feature = "server")]
if self.h1_idle_timeout.is_some() {
// Next read will start and poll the idle timeout.
self.notify_read = true;
}

// !T::should_read_first() means Client.
//
// If Client connection has just gone idle, the Dispatcher
Expand Down
4 changes: 3 additions & 1 deletion src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ where
&mut self,
cx: &mut Context<'_>,
parse_ctx: ParseContext<'_>,
progress: &mut bool,
) -> Poll<crate::Result<ParsedMessage<S::Incoming>>>
where
S: Http1Transaction,
Expand Down Expand Up @@ -217,6 +218,7 @@ where
trace!("parse eof");
return Poll::Ready(Err(crate::Error::new_incomplete()));
}
*progress = true;
}
}

Expand Down Expand Up @@ -714,7 +716,7 @@ mod tests {
on_informational: &mut None,
};
assert!(buffered
.parse::<ClientTransaction>(cx, parse_ctx)
.parse::<ClientTransaction>(cx, parse_ctx, &mut false)
.is_pending());
Poll::Ready(())
})
Expand Down
20 changes: 20 additions & 0 deletions src/server/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub struct Builder {
h1_preserve_header_case: bool,
h1_max_headers: Option<usize>,
h1_header_read_timeout: Dur,
h1_idle_timeout: Dur,
h1_writev: Option<bool>,
max_buf_size: Option<usize>,
pipeline_flush: bool,
Expand Down Expand Up @@ -238,6 +239,7 @@ impl Builder {
h1_preserve_header_case: false,
h1_max_headers: None,
h1_header_read_timeout: Dur::Default(Some(Duration::from_secs(30))),
h1_idle_timeout: Dur::Default(Some(Duration::from_secs(120))),
h1_writev: None,
max_buf_size: None,
pipeline_flush: false,
Expand Down Expand Up @@ -322,6 +324,21 @@ impl Builder {
self
}

/// Set a timeout for idle time between requests. If a client does not
/// transmit another request within this time after receiving the last
/// response, the connection is closed.
///
/// Requires a [`Timer`] set by [`Builder::timer`] to take effect. Panics if `idle_timeout` is configured
/// without a [`Timer`].
///
/// Pass `None` to disable.
///
/// Default is 120 seconds.
pub fn idle_timeout(&mut self, idle_timeout: impl Into<Option<Duration>>) -> &mut Self {
self.h1_idle_timeout = Dur::Configured(idle_timeout.into());
self
}

/// Set whether HTTP/1 connections should try to use vectored writes,
/// or always flatten into a single buffer.
///
Expand Down Expand Up @@ -448,6 +465,9 @@ impl Builder {
{
conn.set_http1_header_read_timeout(dur);
};
if let Some(dur) = self.timer.check(self.h1_idle_timeout, "idle_timeout") {
conn.set_http1_idle_timeout(dur);
};
if let Some(writev) = self.h1_writev {
if writev {
conn.set_write_strategy_queue();
Expand Down
Loading