Skip to content

Commit

Permalink
Make the fetch function faster by calling send immediately.
Browse files Browse the repository at this point in the history
The errors that occur during fetch calls may close the connection.
This PR also documents errors that can occur in a fetch call.
  • Loading branch information
ddragana committed Jul 8, 2020
1 parent e427216 commit f7567ec
Showing 8 changed files with 97 additions and 22 deletions.
1 change: 1 addition & 0 deletions neqo-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -299,6 +299,7 @@ impl<'a> Handler<'a> {
.pop_front()
.expect("download_next called with empty queue");
match client.fetch(
Instant::now(),
&self.args.method,
&url.scheme(),
&url.host_str().unwrap(),
28 changes: 24 additions & 4 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
@@ -217,6 +217,7 @@ impl Http3Client {
/// If a new stream cannot be created an error will be return.
pub fn fetch(
&mut self,
now: Instant,
method: &str,
scheme: &str,
host: &str,
@@ -240,7 +241,10 @@ impl Http3Client {
_ => {}
}

let id = self.conn.stream_create(StreamType::BiDi)?;
let id = self
.conn
.stream_create(StreamType::BiDi)
.map_err(|e| Error::map_stream_create_errors(&e))?;

// Transform pseudo-header fields
let mut final_headers = Vec::new();
@@ -259,6 +263,22 @@ impl Http3Client {
Some(self.push_handler.clone()),
)),
);

// Call immediately send so that at least headers get sent. This will make Firefox faster, since
// it can send request body immediatly in most cases and does not need to do a complete process loop.
if let Err(e) = self
.base_handler
.send_streams
.get_mut(&id)
.ok_or(Error::InvalidStreamId)?
.send(&mut self.conn, &mut self.base_handler.qpack_encoder)
{
if e.connection_error() {
self.close(now, e.code(), "");
}
return Err(e);
}

Ok(id)
}

@@ -988,7 +1008,7 @@ mod tests {
// Fetch request fetch("GET", "https", "something.com", "/", &[]).
fn make_request(client: &mut Http3Client, close_sending_side: bool) -> u64 {
let request_stream_id = client
.fetch("GET", "https", "something.com", "/", &[])
.fetch(now(), "GET", "https", "something.com", "/", &[])
.unwrap();
if close_sending_side {
let _ = client.stream_close_send(request_stream_id);
@@ -2538,7 +2558,7 @@ mod tests {

// Check that a new request cannot be made.
assert_eq!(
client.fetch("GET", "https", "something.com", "/", &[]),
client.fetch(now(), "GET", "https", "something.com", "/", &[]),
Err(Error::AlreadyClosed)
);

@@ -3301,7 +3321,7 @@ mod tests {
fn zero_rtt_before_resumption_token() {
let mut client = default_http3_client();
assert!(client
.fetch("GET", "https", "something.com", "/", &[])
.fetch(now(), "GET", "https", "something.com", "/", &[])
.is_err());
}

31 changes: 28 additions & 3 deletions neqo-http3/src/lib.rs
Original file line number Diff line number Diff line change
@@ -76,6 +76,8 @@ pub enum Error {
TransportError(TransportError),
Unavailable,
Unexpected,
StreamLimitError,
TransportStreamDoesNotExist,
}

impl Error {
@@ -104,7 +106,6 @@ impl Error {
}
}

// TODO: dragana look into http3 errors
#[must_use]
pub fn connection_error(&self) -> bool {
match self {
@@ -117,10 +118,30 @@ impl Error {
| Self::HttpExcessiveLoad
| Self::HttpId
| Self::HttpSettings
| Self::HttpMissingSettings => true,
| Self::HttpMissingSettings
| Self::QpackError(QpackError::EncoderStream)
| Self::QpackError(QpackError::DecoderStream) => true,
_ => false,
}
}

#[must_use]
pub fn map_stream_create_errors(err: &TransportError) -> Self {
match err {
TransportError::ConnectionState => Error::Unavailable,
TransportError::StreamLimitError => Error::StreamLimitError,
_ => {
debug_assert!(false, "Unexpected error");
Error::TransportStreamDoesNotExist
}
}
}

#[must_use]
pub fn map_send_errors() -> Self {
debug_assert!(false, "Unexpected error");
Error::HttpInternal
}
}

impl From<TransportError> for Error {
@@ -131,7 +152,11 @@ impl From<TransportError> for Error {

impl From<QpackError> for Error {
fn from(err: QpackError) -> Self {
Self::QpackError(err)
match err {
QpackError::ClosedCriticalStream => Error::HttpClosedCriticalStream,
QpackError::InternalError => Error::HttpInternal,
e => Self::QpackError(e),
}
}
}

17 changes: 15 additions & 2 deletions neqo-http3/src/send_message.rs
Original file line number Diff line number Diff line change
@@ -199,6 +199,9 @@ impl SendMessage {
}
}

/// # Errors
/// `ClosedCriticalStream` if the encoder stream is closed.
/// `InternalError` if an unexpected error occurred.
fn ensure_encoded(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()> {
if let SendMessageState::Initialized { headers, data, fin } = &self.state {
qdebug!([self], "Encoding headers");
@@ -225,6 +228,13 @@ impl SendMessage {
Ok(())
}

/// # Errors
/// `ClosedCriticalStream` if the encoder stream is closed.
/// `InternalError` if an unexpected error occurred.
/// `InvalidStreamId` if the stream does not exist,
/// `AlreadyClosed` if the stream has already been closed.
/// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if `process_output`
/// has not been called when needed, and HTTP3 layer has not picked up the info that the stream has been closed.)
pub fn send(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()> {
self.ensure_encoded(conn, encoder)?;

@@ -235,14 +245,17 @@ impl SendMessage {
};

if let SendMessageState::SendingInitialMessage { ref mut buf, fin } = self.state {
let sent = conn.stream_send(self.stream_id, &buf)?;
let sent = conn
.stream_send(self.stream_id, &buf)
.map_err(|_| Error::map_send_errors())?;
qlog::h3_data_moved_down(&mut conn.qlog_mut(), self.stream_id, sent);

qtrace!([label], "{} bytes sent", sent);

if sent == buf.len() {
if fin {
conn.stream_close_send(self.stream_id)?;
conn.stream_close_send(self.stream_id)
.map_err(|_| Error::map_send_errors())?;
self.state = SendMessageState::Closed;
qtrace!([label], "done sending request");
} else {
2 changes: 1 addition & 1 deletion neqo-http3/tests/httpconn.rs
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ fn test_fetch() {

eprintln!("-----client");
let req = hconn_c
.fetch("GET", "https", "something.com", "/", &[])
.fetch(now(), "GET", "https", "something.com", "/", &[])
.unwrap();
assert_eq!(req, 0);
hconn_c.stream_close_send(req).unwrap();
7 changes: 4 additions & 3 deletions neqo-interop/src/main.rs
Original file line number Diff line number Diff line change
@@ -530,7 +530,7 @@ fn test_h3(nctx: &NetworkCtx, peer: &Peer, client: Connection, test: &Test) -> R

let client_stream_id = hc
.h3
.fetch("GET", "https", &hc.host, &hc.path, &[])
.fetch(Instant::now(), "GET", "https", &hc.host, &hc.path, &[])
.unwrap();
let _ = hc.h3.stream_close_send(client_stream_id);

@@ -544,6 +544,7 @@ fn test_h3(nctx: &NetworkCtx, peer: &Peer, client: Connection, test: &Test) -> R
let client_stream_id = hc
.h3
.fetch(
Instant::now(),
"GET",
"https",
&hc.host,
@@ -580,7 +581,7 @@ fn test_h3_rz(
// Exchange some data to get http3 control streams and a resumption token.
let client_stream_id = hc
.h3
.fetch("GET", "https", &hc.host, &hc.path, &[])
.fetch(Instant::now(), "GET", "https", &hc.host, &hc.path, &[])
.unwrap();
let _ = hc.h3.stream_close_send(client_stream_id);

@@ -635,7 +636,7 @@ fn test_h3_rz(
// SendH3 data during 0rtt
let client_stream_id = hc
.h3
.fetch("GET", "https", &hc.host, &hc.path, &[])
.fetch(Instant::now(), "GET", "https", &hc.host, &hc.path, &[])
.unwrap();
let _ = hc.h3.stream_close_send(client_stream_id);
hc.streams.insert(client_stream_id);
28 changes: 20 additions & 8 deletions neqo-qpack/src/encoder.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ use crate::table::{HeaderTable, LookupResult, ADDITIONAL_TABLE_ENTRY_SIZE};
use crate::Header;
use crate::{Error, QpackSettings, Res};
use neqo_common::{qdebug, qerror, qlog::NeqoQlog, qtrace};
use neqo_transport::{Connection, StreamId};
use neqo_transport::{Connection, Error as TransportError, StreamId};
use std::collections::{HashMap, HashSet, VecDeque};
use std::convert::TryFrom;

@@ -260,7 +260,9 @@ impl QPackEncoder {

let stream_id = self.local_stream.stream_id().ok_or(Error::Internal)?;

let sent = conn.stream_send_atomic(stream_id.as_u64(), &buf)?;
let sent = conn
.stream_send_atomic(stream_id.as_u64(), &buf)
.map_err(|e| map_stream_send_atomic_error(&e))?;
if !sent {
return Err(Error::EncoderStreamBlocked);
}
@@ -346,9 +348,9 @@ impl QPackEncoder {
}

/// Encodes headers
/// ### Errors
/// This function may return a transport error when a new entry is added to the table and while sending its
/// instruction an error occurs.
/// # Errors
/// `ClosedCriticalStream` if the encoder stream is closed.
/// `InternalError` if an unexpected error occurred.
pub fn encode_header_block(
&mut self,
conn: &mut Connection,
@@ -410,9 +412,7 @@ impl QPackEncoder {
encoded_h.encode_literal_with_name_literal(&name, &value)
}
Err(e) => {
// These errors should never happen:
// `Internal`, `InvalidStreamId`, `InvalidInput`, `FinalSizeError`
debug_assert!(false, "Unexpected error: {:?}", e);
// `InternalError`, `ClosedCriticalStream`
return Err(e);
}
}
@@ -499,6 +499,18 @@ fn map_error(err: &Error) -> Error {
}
}

fn map_stream_send_atomic_error(err: &TransportError) -> Error {
match err {
TransportError::InvalidStreamId | TransportError::FinalSizeError => {
Error::ClosedCriticalStream
}
_ => {
debug_assert!(false, "Unexpected error");
Error::InternalError
}
}
}

#[cfg(test)]
mod tests {
use super::{Connection, Error, Header, QPackEncoder, Res};
5 changes: 4 additions & 1 deletion neqo-transport/src/connection.rs
Original file line number Diff line number Diff line change
@@ -2379,7 +2379,10 @@ impl Connection {
}

/// Create a stream.
// Returns new stream id
/// Returns new stream id
/// # Errors
/// `ConnectionState` if the connecton stat does not allow to create streams.
/// `StreamLimitError` if we are limiied by server's stream concurence.
pub fn stream_create(&mut self, st: StreamType) -> Res<u64> {
// Can't make streams while closing, otherwise rely on the stream limits.
match self.state {

0 comments on commit f7567ec

Please sign in to comment.