Skip to content

Commit

Permalink
Pacing: Pacing of egress QUIC packets
Browse files Browse the repository at this point in the history
  • Loading branch information
Lohith Bellad committed Mar 4, 2021
1 parent 76598e4 commit 2211080
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 12 deletions.
10 changes: 10 additions & 0 deletions include/quiche.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,16 @@ ssize_t quiche_conn_recv(quiche_conn *conn, uint8_t *buf, size_t buf_len);
// Writes a single QUIC packet to be sent to the peer.
ssize_t quiche_conn_send(quiche_conn *conn, uint8_t *out, size_t out_len);

typedef struct {
// Time to send the packet out.
struct timespec send_time;
} quiche_send_info;

// Writes a single QUIC packet to be sent to the peer and fills in the send_info
// struct. Linux only.
ssize_t quiche_conn_send_with_info(quiche_conn *conn, uint8_t *out, size_t out_len,
quiche_send_info *out_info);

// Reads contiguous data from a stream.
ssize_t quiche_conn_stream_recv(quiche_conn *conn, uint64_t stream_id,
uint8_t *out, size_t buf_len, bool *fin);
Expand Down
37 changes: 37 additions & 0 deletions src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ use libc::c_void;
use libc::size_t;
use libc::ssize_t;

#[cfg(target_os = "linux")]
use libc::timespec;

use crate::*;

#[no_mangle]
Expand Down Expand Up @@ -560,6 +563,40 @@ pub extern fn quiche_conn_send(
}
}

#[cfg(target_os = "linux")]
#[repr(C)]
pub struct send_info {
pub send_time: timespec,
}

#[no_mangle]
#[cfg(target_os = "linux")]
pub extern fn quiche_conn_send_with_info(
conn: &mut Connection, out: *mut u8, out_len: size_t,
out_info: &mut send_info,
) -> ssize_t {
if out_len > <ssize_t>::max_value() as usize {
panic!("The provided buffer is too large");
}

let out = unsafe { slice::from_raw_parts_mut(out, out_len) };

match conn.send_with_info(out) {
Ok((v, info)) => {
unsafe {
ptr::copy_nonoverlapping(
&info.send_time as *const _ as *const timespec,
&mut out_info.send_time,
1,
)
};
v as ssize_t
},

Err(e) => e.to_c(),
}
}

#[no_mangle]
pub extern fn quiche_conn_stream_recv(
conn: &mut Connection, stream_id: u64, out: *mut u8, out_len: size_t,
Expand Down
104 changes: 92 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2008,10 +2008,12 @@ impl Connection {
Ok(read)
}

/// Writes a single QUIC packet to be sent to the peer.
/// Writes a single QUIC packet to be sent to the peer along with
/// [`SendInfo`] which includes info like time to send the packet out.
///
/// On success the number of bytes written to the output buffer is
/// returned, or [`Done`] if there was nothing to write.
/// returned along with [`SendInfo`], or [`Done`] if there was nothing
/// to write.
///
/// The application should call `send()` multiple times until [`Done`] is
/// returned, indicating that there are no more packets to send. It is
Expand All @@ -2031,6 +2033,7 @@ impl Connection {
/// [`on_timeout()`]: struct.Connection.html#method.on_timeout
/// [`stream_send()`]: struct.Connection.html#method.stream_send
/// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
/// [`SendInfo`]: struct.SendInfo.html
///
/// ## Examples:
///
Expand All @@ -2041,8 +2044,11 @@ impl Connection {
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// # let mut conn = quiche::accept(&scid, None, &mut config)?;
/// loop {
/// let write = match conn.send(&mut out) {
/// Ok(v) => v,
/// let write = match conn.send_with_info(&mut out) {
/// Ok((v, send_info)) => {
/// // Use send_info
/// v
/// },
///
/// Err(quiche::Error::Done) => {
/// // Done writing.
Expand All @@ -2059,11 +2065,15 @@ impl Connection {
/// }
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn send(&mut self, out: &mut [u8]) -> Result<usize> {
pub fn send_with_info(
&mut self, out: &mut [u8],
) -> Result<(usize, SendInfo)> {
if out.is_empty() {
return Err(Error::BufferTooShort);
}

let now = time::Instant::now();

let mut has_initial = false;

let mut done = 0;
Expand All @@ -2081,7 +2091,7 @@ impl Connection {
// Generate coalesced packets.
while left > 0 {
let (ty, written) =
match self.send_single(&mut out[done..done + left]) {
match self.send_single(&mut out[done..done + left], now) {
Ok(v) => v,

Err(Error::BufferTooShort) | Err(Error::Done) => break,
Expand All @@ -2102,6 +2112,10 @@ impl Connection {
};
}

let out_info = SendInfo {
send_time: self.recovery.get_packet_send_time().unwrap_or(now),
};

if done == 0 {
return Err(Error::Done);
}
Expand All @@ -2117,12 +2131,12 @@ impl Connection {
done += pad_len;
}

Ok(done)
Ok((done, out_info))
}

fn send_single(&mut self, out: &mut [u8]) -> Result<(packet::Type, usize)> {
let now = time::Instant::now();

fn send_single(
&mut self, out: &mut [u8], now: time::Instant,
) -> Result<(packet::Type, usize)> {
if out.is_empty() {
return Err(Error::BufferTooShort);
}
Expand Down Expand Up @@ -2918,6 +2932,62 @@ impl Connection {
Ok((pkt_type, written))
}

/// Writes a single QUIC packet to be sent to the peer.
///
/// On success the number of bytes written to the output buffer is
/// returned, or [`Done`] if there was nothing to write.
///
/// The application should call `send()` multiple times until [`Done`] is
/// returned, indicating that there are no more packets to send. It is
/// recommended that `send()` be called in the following cases:
///
/// * When the application receives QUIC packets from the peer (that is,
/// any time [`recv()`] is also called).
///
/// * When the connection timer expires (that is, any time [`on_timeout()`]
/// is also called).
///
/// * When the application sends data to the peer (for examples, any time
/// [`stream_send()`] or [`stream_shutdown()`] are called).
///
/// [`Done`]: enum.Error.html#variant.Done
/// [`recv()`]: struct.Connection.html#method.recv
/// [`on_timeout()`]: struct.Connection.html#method.on_timeout
/// [`stream_send()`]: struct.Connection.html#method.stream_send
/// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
///
/// ## Examples:
///
/// ```no_run
/// # let mut out = [0; 512];
/// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
/// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
/// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
/// # let mut conn = quiche::accept(&scid, None, &mut config)?;
/// loop {
/// let write = match conn.send(&mut out) {
/// Ok(v) => v,
///
/// Err(quiche::Error::Done) => {
/// // Done writing.
/// break;
/// },
///
/// Err(e) => {
/// // An error occurred, handle it.
/// break;
/// },
/// };
///
/// socket.send(&out[..write]).unwrap();
/// }
/// # Ok::<(), quiche::Error>(())
/// ```
pub fn send(&mut self, out: &mut [u8]) -> Result<usize> {
let (written, _) = self.send_with_info(out)?;
Ok(written)
}

// Returns the maximum size of a packet to be sent.
//
// This is a minimum of the sender's and the receiver's maximum UDP payload
Expand Down Expand Up @@ -4484,6 +4554,12 @@ fn drop_pkt_on_err(
Error::Done
}

/// Info send out on every send_with_info call.
pub struct SendInfo {
/// Time to send the packet out.
pub send_time: time::Instant,
}

/// Statistics about the connection.
///
/// A connections's statistics can be collected using the [`stats()`] method.
Expand Down Expand Up @@ -8333,6 +8409,8 @@ mod tests {

let mut pipe = testing::Pipe::default().unwrap();

let mut now = time::Instant::now();

// Client sends padded Initial.
let len = pipe.client.send(&mut buf).unwrap();
assert_eq!(len, 1200);
Expand All @@ -8344,13 +8422,15 @@ mod tests {
testing::process_flight(&mut pipe.client, flight).unwrap();

// Client sends Initial packet with ACK.
let (ty, len) = pipe.client.send_single(&mut buf).unwrap();
let (ty, len) = pipe.client.send_single(&mut buf, now).unwrap();
assert_eq!(ty, Type::Initial);

assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));

now = time::Instant::now();

// Client sends Handshake packet.
let (ty, len) = pipe.client.send_single(&mut buf).unwrap();
let (ty, len) = pipe.client.send_single(&mut buf, now).unwrap();
assert_eq!(ty, Type::Handshake);

// Packet type is corrupted to Initial.
Expand Down
5 changes: 5 additions & 0 deletions src/recovery/cubic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub static CUBIC: CongestionControlOps = CongestionControlOps {
on_packet_acked,
congestion_event,
collapse_cwnd,
has_custom_pacing,
};

/// CUBIC Constants.
Expand Down Expand Up @@ -312,6 +313,10 @@ fn congestion_event(
}
}

fn has_custom_pacing() -> bool {
false
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 2211080

Please sign in to comment.