From 07683eecb6a466d2f3e1c910f2c6237b03cec112 Mon Sep 17 00:00:00 2001 From: Lohith Bellad Date: Mon, 19 Oct 2020 12:15:10 -0700 Subject: [PATCH] Pacing: Pacing of egress QUIC packets --- include/quiche.h | 10 ++ src/ffi.rs | 37 ++++++++ src/lib.rs | 83 ++++++++++++++++- src/recovery/cubic.rs | 5 + src/recovery/mod.rs | 206 ++++++++++++++++++++++++++++++++++++++++++ src/recovery/reno.rs | 5 + 6 files changed, 341 insertions(+), 5 deletions(-) diff --git a/include/quiche.h b/include/quiche.h index 5da806d664..6b0dbee16c 100644 --- a/include/quiche.h +++ b/include/quiche.h @@ -257,6 +257,16 @@ ssize_t quiche_conn_recv(quiche_conn *conn, uint8_t *buf, size_t buf_len); // Writes a single QUIC packet to be sent to the peer. ssize_t quiche_conn_send(quiche_conn *conn, uint8_t *out, size_t out_len); +typedef struct { + // Time to send the packet out. + struct timespec send_time; +} send_info; + +// Writes a single QUIC packet to be sent to the peer and fills in the send_info +// struct. +ssize_t quiche_conn_send_at(quiche_conn *conn, uint8_t *out, size_t out_len, + struct send_info *out_info); + // Buffer holding data at a specific offset. typedef struct RangeBuf quiche_rangebuf; diff --git a/src/ffi.rs b/src/ffi.rs index 64f288430e..80ef38c325 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -38,6 +38,9 @@ use libc::c_void; use libc::size_t; use libc::ssize_t; +#[cfg(unix)] +use libc::timespec; + use crate::*; #[no_mangle] @@ -543,6 +546,40 @@ pub extern fn quiche_conn_send( } } +#[cfg(unix)] +#[repr(C)] +pub struct send_info { + pub send_time: timespec, +} + +#[no_mangle] +#[cfg(unix)] +pub extern fn quiche_conn_send_with_info( + conn: &mut Connection, out: *mut u8, out_len: size_t, + out_info: &mut send_info, +) -> ssize_t { + if out_len > ::max_value() as usize { + panic!("The provided buffer is too large"); + } + + let out = unsafe { slice::from_raw_parts_mut(out, out_len) }; + + match conn.send_with_info(out) { + Ok((v, info)) => { + unsafe { + ptr::copy_nonoverlapping( + &info.send_time as *const _ as *const timespec, + &mut out_info.send_time, + 1, + ) + }; + v as ssize_t + }, + + Err(e) => e.to_c(), + } +} + #[no_mangle] pub extern fn quiche_conn_stream_recv( conn: &mut Connection, stream_id: u64, out: *mut u8, out_len: size_t, diff --git a/src/lib.rs b/src/lib.rs index ba928e5b4b..92c555811a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1986,10 +1986,12 @@ impl Connection { Ok(read) } - /// Writes a single QUIC packet to be sent to the peer. + /// Writes a single QUIC packet to be sent to the peer along with send_info + /// like to time to send the packet out. /// /// On success the number of bytes written to the output buffer is - /// returned, or [`Done`] if there was nothing to write. + /// returned along with send_info, or [`Done`] if there was nothing to + /// write. /// /// The application should call `send()` multiple times until [`Done`] is /// returned, indicating that there are no more packets to send. It is @@ -2019,8 +2021,11 @@ impl Connection { /// # let scid = [0xba; 16]; /// # let mut conn = quiche::accept(&scid, None, &mut config)?; /// loop { - /// let write = match conn.send(&mut out) { - /// Ok(v) => v, + /// let write = match conn.send_with_info(&mut out) { + /// Ok((v, send_info)) => { + /// // Use send_info + /// v; + /// }, /// /// Err(quiche::Error::Done) => { /// // Done writing. @@ -2037,7 +2042,9 @@ impl Connection { /// } /// # Ok::<(), quiche::Error>(()) /// ``` - pub fn send(&mut self, out: &mut [u8]) -> Result { + pub fn send_with_info( + &mut self, out: &mut [u8], + ) -> Result<(usize, SendInfo)> { let now = time::Instant::now(); if out.is_empty() { @@ -2796,6 +2803,10 @@ impl Connection { &self.trace_id, ); + let out_info = SendInfo { + send_time: self.recovery.get_packet_send_time().unwrap_or(now), + }; + qlog_with!(self.qlog_streamer, q, { let ev = self.recovery.to_qlog(); q.add_event(ev).ok(); @@ -2828,6 +2839,62 @@ impl Connection { self.ack_eliciting_sent = true; } + Ok((written, out_info)) + } + + /// Writes a single QUIC packet to be sent to the peer. + /// + /// On success the number of bytes written to the output buffer is + /// returned, or [`Done`] if there was nothing to write. + /// + /// The application should call `send()` multiple times until [`Done`] is + /// returned, indicating that there are no more packets to send. It is + /// recommended that `send()` be called in the following cases: + /// + /// * When the application receives QUIC packets from the peer (that is, + /// any time [`recv()`] is also called). + /// + /// * When the connection timer expires (that is, any time [`on_timeout()`] + /// is also called). + /// + /// * When the application sends data to the peer (for examples, any time + /// [`stream_send()`] or [`stream_shutdown()`] are called). + /// + /// [`Done`]: enum.Error.html#variant.Done + /// [`recv()`]: struct.Connection.html#method.recv + /// [`on_timeout()`]: struct.Connection.html#method.on_timeout + /// [`stream_send()`]: struct.Connection.html#method.stream_send + /// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown + /// + /// ## Examples: + /// + /// ```no_run + /// # let mut out = [0; 512]; + /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap(); + /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?; + /// # let scid = [0xba; 16]; + /// # let mut conn = quiche::accept(&scid, None, &mut config)?; + /// loop { + /// let write = match conn.send_at(&mut out) { + /// Ok(v) => v, + /// + /// Err(quiche::Error::Done) => { + /// // Done writing. + /// break; + /// }, + /// + /// Err(e) => { + /// // An error occurred, handle it. + /// break; + /// }, + /// }; + /// + /// socket.send(&out[..write]).unwrap(); + /// } + /// # Ok::<(), quiche::Error>(()) + /// ``` + pub fn send(&mut self, out: &mut [u8]) -> Result { + let (written, _) = self.send_with_info(out)?; Ok(written) } @@ -4381,6 +4448,12 @@ fn drop_pkt_on_err( Error::Done } +/// Info send out on every send_with_info call. +pub struct SendInfo { + /// Time to send the packet out. + pub send_time: time::Instant, +} + /// Statistics about the connection. /// /// A connections's statistics can be collected using the [`stats()`] method. diff --git a/src/recovery/cubic.rs b/src/recovery/cubic.rs index 42b99cd115..4ba30c6661 100644 --- a/src/recovery/cubic.rs +++ b/src/recovery/cubic.rs @@ -49,6 +49,7 @@ pub static CUBIC: CongestionControlOps = CongestionControlOps { on_packet_acked, congestion_event, collapse_cwnd, + has_custom_pacing, }; /// CUBIC Constants. @@ -292,6 +293,10 @@ fn congestion_event( } } +fn has_custom_pacing() -> bool { + false +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/recovery/mod.rs b/src/recovery/mod.rs index de068ce48a..72f7a9a7bc 100644 --- a/src/recovery/mod.rs +++ b/src/recovery/mod.rs @@ -116,6 +116,8 @@ pub struct Recovery { bytes_acked: usize, + bytes_sent: usize, + congestion_recovery_start_time: Option, max_datagram_size: usize, @@ -124,6 +126,11 @@ pub struct Recovery { // HyStart++. hystart: hystart::Hystart, + + // Pacing + pacing_rate: u64, + + last_packet_scheduled_time: Option, } impl Recovery { @@ -178,6 +185,8 @@ impl Recovery { bytes_acked: 0, + bytes_sent: 0, + congestion_recovery_start_time: None, max_datagram_size: config.max_send_udp_payload_size, @@ -191,6 +200,10 @@ impl Recovery { app_limited: false, hystart: hystart::Hystart::new(config.hystart), + + pacing_rate: 0, + + last_packet_scheduled_time: None, } } @@ -233,6 +246,18 @@ impl Recovery { self.hystart.start_round(pkt_num); } + // Pacing: Set the pacing rate if BBR is not used + if !(self.cc_ops.has_custom_pacing)() { + if let Some(srtt) = self.smoothed_rtt { + let rate = (self.congestion_window as u64 * 1000000) / + srtt.as_micros() as u64; + self.set_pacing_rate(rate); + } + } + + self.schedule_next_packet(epoch, now, sent_bytes); + + self.bytes_sent += sent_bytes; trace!("{} {:?}", trace_id, self); } @@ -240,6 +265,43 @@ impl Recovery { (self.cc_ops.on_packet_sent)(self, sent_bytes, now); } + pub fn set_pacing_rate(&mut self, rate: u64) { + if rate > 0 { + self.pacing_rate = rate; + } + } + + pub fn get_packet_send_time(&self) -> Option { + self.last_packet_scheduled_time + } + + fn schedule_next_packet( + &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize, + ) { + // Pacing is not done for following cases, + // 1. Packet epoch is not EPOCH_APPLICATION. + // 2. If packet has only ACK frames. + // 3. Start of the connection. + if epoch != packet::EPOCH_APPLICATION || + packet_size == 0 || + self.bytes_sent <= self.congestion_window || + self.pacing_rate == 0 + { + self.last_packet_scheduled_time = Some(now); + return; + } + + let interval: u64 = (packet_size as u64 * 1000000) / self.pacing_rate; + let next_send_interval = Duration::from_micros(interval); + + let next_schedule_time = match self.last_packet_scheduled_time { + Some(last_scheduled_time) => last_scheduled_time + next_send_interval, + None => now, + }; + + self.last_packet_scheduled_time = Some(cmp::max(now, next_schedule_time)); + } + pub fn on_ack_received( &mut self, ranges: &ranges::RangeSet, ack_delay: u64, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, @@ -845,6 +907,8 @@ pub struct CongestionControlOps { ), pub collapse_cwnd: fn(r: &mut Recovery), + + pub has_custom_pacing: fn() -> bool, } impl From for &'static CongestionControlOps { @@ -891,6 +955,12 @@ impl std::fmt::Debug for Recovery { self.congestion_recovery_start_time )?; write!(f, "{:?} ", self.delivery_rate)?; + write!(f, "pacing_rate={:?}", self.pacing_rate)?; + write!( + f, + "last_packet_scheduled_time={:?}", + self.last_packet_scheduled_time + )?; if self.hystart.enabled() { write!(f, "hystart={:?} ", self.hystart)?; @@ -1554,6 +1624,142 @@ mod tests { // Spurious loss. assert_eq!(r.lost_count, 1); } + + #[test] + fn test_packet_pacing() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); + + let mut r = Recovery::new(&cfg); + + let mut now = Instant::now(); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0); + + // send out first packet. + let p = Sent { + pkt_num: 0, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6500, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + recent_delivered_packet_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1); + assert_eq!(r.bytes_in_flight, 6500); + + // First packet will be sent out immidiately. + assert_eq!(r.pacing_rate, 0); + assert_eq!(r.get_packet_send_time().unwrap(), now); + + // Wait 50ms for ACK. + now += Duration::from_millis(50); + + let mut acked = ranges::RangeSet::default(); + acked.insert(0..1); + + assert_eq!( + r.on_ack_received( + &acked, + 10, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "" + ), + Ok(()) + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0); + assert_eq!(r.bytes_in_flight, 0); + assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50)); + + // Send out second packet. + let p = Sent { + pkt_num: 1, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6500, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + recent_delivered_packet_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1); + assert_eq!(r.bytes_in_flight, 6500); + + // Pacing is not done during intial phase of connection. + assert_eq!(r.get_packet_send_time().unwrap(), now); + + // Send the third packet out. + let p = Sent { + pkt_num: 2, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6500, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + recent_delivered_packet_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2); + assert_eq!(r.bytes_in_flight, 13000); + assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50)); + + // We pace this outgoing packet. as all conditions for pacing + // are passed. + assert_eq!(r.pacing_rate, (12000.0 / 0.05) as u64); + assert_eq!( + r.get_packet_send_time().unwrap(), + now + Duration::from_micros( + (6500 * 1000000) / (12000.0 / 0.05) as u64 + ) + ); + } } mod cubic; diff --git a/src/recovery/reno.rs b/src/recovery/reno.rs index bb22b83ad9..ceb1a5bd11 100644 --- a/src/recovery/reno.rs +++ b/src/recovery/reno.rs @@ -43,6 +43,7 @@ pub static RENO: CongestionControlOps = CongestionControlOps { on_packet_acked, congestion_event, collapse_cwnd, + has_custom_pacing, }; pub fn on_packet_sent(r: &mut Recovery, sent_bytes: usize, _now: Instant) { @@ -131,6 +132,10 @@ pub fn collapse_cwnd(r: &mut Recovery) { r.bytes_acked = 0; } +fn has_custom_pacing() -> bool { + false +} + #[cfg(test)] mod tests { use super::*;