Skip to content

Commit

Permalink
Make the fetch function faster by calling send immediately.
Browse files Browse the repository at this point in the history
The errors that occur during fetch calls may close the connection.
This PR also documents errors that can occur in a fetch call.
  • Loading branch information
ddragana committed Jul 9, 2020
1 parent ae4eb66 commit b9519fd
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 22 deletions.
1 change: 1 addition & 0 deletions neqo-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ impl<'a> Handler<'a> {
.pop_front()
.expect("download_next called with empty queue");
match client.fetch(
Instant::now(),
&self.args.method,
&url.scheme(),
&url.host_str().unwrap(),
Expand Down
28 changes: 24 additions & 4 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ impl Http3Client {
/// If a new stream cannot be created an error will be return.
pub fn fetch(
&mut self,
now: Instant,
method: &str,
scheme: &str,
host: &str,
Expand All @@ -252,7 +253,10 @@ impl Http3Client {
_ => {}
}

let id = self.conn.stream_create(StreamType::BiDi)?;
let id = self
.conn
.stream_create(StreamType::BiDi)
.map_err(|e| Error::map_stream_create_errors(&e))?;

// Transform pseudo-header fields
let mut final_headers = Vec::new();
Expand All @@ -271,6 +275,22 @@ impl Http3Client {
Some(self.push_handler.clone()),
)),
);

// Call immediately send so that at least headers get sent. This will make Firefox faster, since
// it can send request body immediatly in most cases and does not need to do a complete process loop.
if let Err(e) = self
.base_handler
.send_streams
.get_mut(&id)
.ok_or(Error::InvalidStreamId)?
.send(&mut self.conn, &mut self.base_handler.qpack_encoder)
{
if e.connection_error() {
self.close(now, e.code(), "");
}
return Err(e);
}

Ok(id)
}

Expand Down Expand Up @@ -1003,7 +1023,7 @@ mod tests {
// Fetch request fetch("GET", "https", "something.com", "/", &[]).
fn make_request(client: &mut Http3Client, close_sending_side: bool) -> u64 {
let request_stream_id = client
.fetch("GET", "https", "something.com", "/", &[])
.fetch(now(), "GET", "https", "something.com", "/", &[])
.unwrap();
if close_sending_side {
let _ = client.stream_close_send(request_stream_id);
Expand Down Expand Up @@ -2537,7 +2557,7 @@ mod tests {

// Check that a new request cannot be made.
assert_eq!(
client.fetch("GET", "https", "something.com", "/", &[]),
client.fetch(now(), "GET", "https", "something.com", "/", &[]),
Err(Error::AlreadyClosed)
);

Expand Down Expand Up @@ -3300,7 +3320,7 @@ mod tests {
fn zero_rtt_before_resumption_token() {
let mut client = default_http3_client();
assert!(client
.fetch("GET", "https", "something.com", "/", &[])
.fetch(now(), "GET", "https", "something.com", "/", &[])
.is_err());
}

Expand Down
30 changes: 27 additions & 3 deletions neqo-http3/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub enum Error {
TransportError(TransportError),
Unavailable,
Unexpected,
StreamLimitError,
TransportStreamDoesNotExist,
InvalidInput,
FatalError,
Expand Down Expand Up @@ -107,7 +108,6 @@ impl Error {
}
}

// TODO: dragana look into http3 errors
#[must_use]
pub fn connection_error(&self) -> bool {
match self {
Expand All @@ -120,7 +120,9 @@ impl Error {
| Self::HttpExcessiveLoad
| Self::HttpId
| Self::HttpSettings
| Self::HttpMissingSettings => true,
| Self::HttpMissingSettings
| Self::QpackError(QpackError::EncoderStream)
| Self::QpackError(QpackError::DecoderStream) => true,
_ => false,
}
}
Expand All @@ -139,6 +141,18 @@ impl Error {
}
}

#[must_use]
pub fn map_stream_create_errors(err: &TransportError) -> Self {
match err {
TransportError::ConnectionState => Error::Unavailable,
TransportError::StreamLimitError => Error::StreamLimitError,
_ => {
debug_assert!(false, "Unexpected error");
Error::TransportStreamDoesNotExist
}
}
}

#[must_use]
pub fn map_stream_recv_errors(err: &TransportError) -> Self {
match err {
Expand All @@ -163,6 +177,12 @@ impl Error {
_ => Error::InvalidResumptionToken,
}
}

#[must_use]
pub fn map_send_errors() -> Self {
debug_assert!(false, "Unexpected error");
Error::HttpInternal
}
}

impl From<TransportError> for Error {
Expand All @@ -173,7 +193,11 @@ impl From<TransportError> for Error {

impl From<QpackError> for Error {
fn from(err: QpackError) -> Self {
Self::QpackError(err)
match err {
QpackError::ClosedCriticalStream => Error::HttpClosedCriticalStream,
QpackError::InternalError => Error::HttpInternal,
e => Self::QpackError(e),
}
}
}

Expand Down
17 changes: 15 additions & 2 deletions neqo-http3/src/send_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ impl SendMessage {
}
}

/// # Errors
/// `ClosedCriticalStream` if the encoder stream is closed.
/// `InternalError` if an unexpected error occurred.
fn ensure_encoded(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()> {
if let SendMessageState::Initialized { headers, data, fin } = &self.state {
qdebug!([self], "Encoding headers");
Expand All @@ -225,6 +228,13 @@ impl SendMessage {
Ok(())
}

/// # Errors
/// `ClosedCriticalStream` if the encoder stream is closed.
/// `InternalError` if an unexpected error occurred.
/// `InvalidStreamId` if the stream does not exist,
/// `AlreadyClosed` if the stream has already been closed.
/// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if `process_output`
/// has not been called when needed, and HTTP3 layer has not picked up the info that the stream has been closed.)
pub fn send(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()> {
self.ensure_encoded(conn, encoder)?;

Expand All @@ -235,14 +245,17 @@ impl SendMessage {
};

if let SendMessageState::SendingInitialMessage { ref mut buf, fin } = self.state {
let sent = conn.stream_send(self.stream_id, &buf)?;
let sent = conn
.stream_send(self.stream_id, &buf)
.map_err(|_| Error::map_send_errors())?;
qlog::h3_data_moved_down(&mut conn.qlog_mut(), self.stream_id, sent);

qtrace!([label], "{} bytes sent", sent);

if sent == buf.len() {
if fin {
conn.stream_close_send(self.stream_id)?;
conn.stream_close_send(self.stream_id)
.map_err(|_| Error::map_send_errors())?;
self.state = SendMessageState::Closed;
qtrace!([label], "done sending request");
} else {
Expand Down
2 changes: 1 addition & 1 deletion neqo-http3/tests/httpconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ fn test_fetch() {

eprintln!("-----client");
let req = hconn_c
.fetch("GET", "https", "something.com", "/", &[])
.fetch(now(), "GET", "https", "something.com", "/", &[])
.unwrap();
assert_eq!(req, 0);
hconn_c.stream_close_send(req).unwrap();
Expand Down
7 changes: 4 additions & 3 deletions neqo-interop/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ fn test_h3(nctx: &NetworkCtx, peer: &Peer, client: Connection, test: &Test) -> R

let client_stream_id = hc
.h3
.fetch("GET", "https", &hc.host, &hc.path, &[])
.fetch(Instant::now(), "GET", "https", &hc.host, &hc.path, &[])
.unwrap();
let _ = hc.h3.stream_close_send(client_stream_id);

Expand All @@ -544,6 +544,7 @@ fn test_h3(nctx: &NetworkCtx, peer: &Peer, client: Connection, test: &Test) -> R
let client_stream_id = hc
.h3
.fetch(
Instant::now(),
"GET",
"https",
&hc.host,
Expand Down Expand Up @@ -580,7 +581,7 @@ fn test_h3_rz(
// Exchange some data to get http3 control streams and a resumption token.
let client_stream_id = hc
.h3
.fetch("GET", "https", &hc.host, &hc.path, &[])
.fetch(Instant::now(), "GET", "https", &hc.host, &hc.path, &[])
.unwrap();
let _ = hc.h3.stream_close_send(client_stream_id);

Expand Down Expand Up @@ -635,7 +636,7 @@ fn test_h3_rz(
// SendH3 data during 0rtt
let client_stream_id = hc
.h3
.fetch("GET", "https", &hc.host, &hc.path, &[])
.fetch(Instant::now(), "GET", "https", &hc.host, &hc.path, &[])
.unwrap();
let _ = hc.h3.stream_close_send(client_stream_id);
hc.streams.insert(client_stream_id);
Expand Down
28 changes: 20 additions & 8 deletions neqo-qpack/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::table::{HeaderTable, LookupResult, ADDITIONAL_TABLE_ENTRY_SIZE};
use crate::Header;
use crate::{Error, QpackSettings, Res};
use neqo_common::{qdebug, qerror, qlog::NeqoQlog, qtrace};
use neqo_transport::{Connection, StreamId};
use neqo_transport::{Connection, Error as TransportError, StreamId};
use std::collections::{HashMap, HashSet, VecDeque};
use std::convert::TryFrom;

Expand Down Expand Up @@ -260,7 +260,9 @@ impl QPackEncoder {

let stream_id = self.local_stream.stream_id().ok_or(Error::Internal)?;

let sent = conn.stream_send_atomic(stream_id.as_u64(), &buf)?;
let sent = conn
.stream_send_atomic(stream_id.as_u64(), &buf)
.map_err(|e| map_stream_send_atomic_error(&e))?;
if !sent {
return Err(Error::EncoderStreamBlocked);
}
Expand Down Expand Up @@ -346,9 +348,9 @@ impl QPackEncoder {
}

/// Encodes headers
/// ### Errors
/// This function may return a transport error when a new entry is added to the table and while sending its
/// instruction an error occurs.
/// # Errors
/// `ClosedCriticalStream` if the encoder stream is closed.
/// `InternalError` if an unexpected error occurred.
pub fn encode_header_block(
&mut self,
conn: &mut Connection,
Expand Down Expand Up @@ -410,9 +412,7 @@ impl QPackEncoder {
encoded_h.encode_literal_with_name_literal(&name, &value)
}
Err(e) => {
// These errors should never happen:
// `Internal`, `InvalidStreamId`, `InvalidInput`, `FinalSizeError`
debug_assert!(false, "Unexpected error: {:?}", e);
// `InternalError`, `ClosedCriticalStream`
return Err(e);
}
}
Expand Down Expand Up @@ -499,6 +499,18 @@ fn map_error(err: &Error) -> Error {
}
}

fn map_stream_send_atomic_error(err: &TransportError) -> Error {
match err {
TransportError::InvalidStreamId | TransportError::FinalSizeError => {
Error::ClosedCriticalStream
}
_ => {
debug_assert!(false, "Unexpected error");
Error::InternalError
}
}
}

#[cfg(test)]
mod tests {
use super::{Connection, Error, Header, QPackEncoder, Res};
Expand Down
5 changes: 4 additions & 1 deletion neqo-transport/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2393,7 +2393,10 @@ impl Connection {
}

/// Create a stream.
// Returns new stream id
/// Returns new stream id
/// # Errors
/// `ConnectionState` if the connecton stat does not allow to create streams.
/// `StreamLimitError` if we are limiied by server's stream concurence.
pub fn stream_create(&mut self, st: StreamType) -> Res<u64> {
// Can't make streams while closing, otherwise rely on the stream limits.
match self.state {
Expand Down

0 comments on commit b9519fd

Please sign in to comment.