From 2211080e805516f014db9015bd3132d138b24bf2 Mon Sep 17 00:00:00 2001 From: Lohith Bellad Date: Mon, 19 Oct 2020 12:15:10 -0700 Subject: [PATCH] Pacing: Pacing of egress QUIC packets --- include/quiche.h | 10 ++ src/ffi.rs | 37 ++++++++ src/lib.rs | 104 ++++++++++++++++++--- src/recovery/cubic.rs | 5 + src/recovery/mod.rs | 208 ++++++++++++++++++++++++++++++++++++++++++ src/recovery/reno.rs | 5 + 6 files changed, 357 insertions(+), 12 deletions(-) diff --git a/include/quiche.h b/include/quiche.h index 0f63acb7e3..0f147fdd45 100644 --- a/include/quiche.h +++ b/include/quiche.h @@ -258,6 +258,16 @@ ssize_t quiche_conn_recv(quiche_conn *conn, uint8_t *buf, size_t buf_len); // Writes a single QUIC packet to be sent to the peer. ssize_t quiche_conn_send(quiche_conn *conn, uint8_t *out, size_t out_len); +typedef struct { + // Time to send the packet out. + struct timespec send_time; +} quiche_send_info; + +// Writes a single QUIC packet to be sent to the peer and fills in the send_info +// struct. Linux only. +ssize_t quiche_conn_send_with_info(quiche_conn *conn, uint8_t *out, size_t out_len, + quiche_send_info *out_info); + // Reads contiguous data from a stream. ssize_t quiche_conn_stream_recv(quiche_conn *conn, uint64_t stream_id, uint8_t *out, size_t buf_len, bool *fin); diff --git a/src/ffi.rs b/src/ffi.rs index 4b66fd1713..1bf669158e 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -38,6 +38,9 @@ use libc::c_void; use libc::size_t; use libc::ssize_t; +#[cfg(target_os = "linux")] +use libc::timespec; + use crate::*; #[no_mangle] @@ -560,6 +563,40 @@ pub extern fn quiche_conn_send( } } +#[cfg(target_os = "linux")] +#[repr(C)] +pub struct send_info { + pub send_time: timespec, +} + +#[no_mangle] +#[cfg(target_os = "linux")] +pub extern fn quiche_conn_send_with_info( + conn: &mut Connection, out: *mut u8, out_len: size_t, + out_info: &mut send_info, +) -> ssize_t { + if out_len > ::max_value() as usize { + panic!("The provided buffer is too large"); + } + + let out = unsafe { slice::from_raw_parts_mut(out, out_len) }; + + match conn.send_with_info(out) { + Ok((v, info)) => { + unsafe { + ptr::copy_nonoverlapping( + &info.send_time as *const _ as *const timespec, + &mut out_info.send_time, + 1, + ) + }; + v as ssize_t + }, + + Err(e) => e.to_c(), + } +} + #[no_mangle] pub extern fn quiche_conn_stream_recv( conn: &mut Connection, stream_id: u64, out: *mut u8, out_len: size_t, diff --git a/src/lib.rs b/src/lib.rs index 555e840d3b..69016195cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2008,10 +2008,12 @@ impl Connection { Ok(read) } - /// Writes a single QUIC packet to be sent to the peer. + /// Writes a single QUIC packet to be sent to the peer along with + /// [`SendInfo`] which includes info like time to send the packet out. /// /// On success the number of bytes written to the output buffer is - /// returned, or [`Done`] if there was nothing to write. + /// returned along with [`SendInfo`], or [`Done`] if there was nothing + /// to write. /// /// The application should call `send()` multiple times until [`Done`] is /// returned, indicating that there are no more packets to send. It is @@ -2031,6 +2033,7 @@ impl Connection { /// [`on_timeout()`]: struct.Connection.html#method.on_timeout /// [`stream_send()`]: struct.Connection.html#method.stream_send /// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown + /// [`SendInfo`]: struct.SendInfo.html /// /// ## Examples: /// @@ -2041,8 +2044,11 @@ impl Connection { /// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]); /// # let mut conn = quiche::accept(&scid, None, &mut config)?; /// loop { - /// let write = match conn.send(&mut out) { - /// Ok(v) => v, + /// let write = match conn.send_with_info(&mut out) { + /// Ok((v, send_info)) => { + /// // Use send_info + /// v + /// }, /// /// Err(quiche::Error::Done) => { /// // Done writing. @@ -2059,11 +2065,15 @@ impl Connection { /// } /// # Ok::<(), quiche::Error>(()) /// ``` - pub fn send(&mut self, out: &mut [u8]) -> Result { + pub fn send_with_info( + &mut self, out: &mut [u8], + ) -> Result<(usize, SendInfo)> { if out.is_empty() { return Err(Error::BufferTooShort); } + let now = time::Instant::now(); + let mut has_initial = false; let mut done = 0; @@ -2081,7 +2091,7 @@ impl Connection { // Generate coalesced packets. while left > 0 { let (ty, written) = - match self.send_single(&mut out[done..done + left]) { + match self.send_single(&mut out[done..done + left], now) { Ok(v) => v, Err(Error::BufferTooShort) | Err(Error::Done) => break, @@ -2102,6 +2112,10 @@ impl Connection { }; } + let out_info = SendInfo { + send_time: self.recovery.get_packet_send_time().unwrap_or(now), + }; + if done == 0 { return Err(Error::Done); } @@ -2117,12 +2131,12 @@ impl Connection { done += pad_len; } - Ok(done) + Ok((done, out_info)) } - fn send_single(&mut self, out: &mut [u8]) -> Result<(packet::Type, usize)> { - let now = time::Instant::now(); - + fn send_single( + &mut self, out: &mut [u8], now: time::Instant, + ) -> Result<(packet::Type, usize)> { if out.is_empty() { return Err(Error::BufferTooShort); } @@ -2918,6 +2932,62 @@ impl Connection { Ok((pkt_type, written)) } + /// Writes a single QUIC packet to be sent to the peer. + /// + /// On success the number of bytes written to the output buffer is + /// returned, or [`Done`] if there was nothing to write. + /// + /// The application should call `send()` multiple times until [`Done`] is + /// returned, indicating that there are no more packets to send. It is + /// recommended that `send()` be called in the following cases: + /// + /// * When the application receives QUIC packets from the peer (that is, + /// any time [`recv()`] is also called). + /// + /// * When the connection timer expires (that is, any time [`on_timeout()`] + /// is also called). + /// + /// * When the application sends data to the peer (for examples, any time + /// [`stream_send()`] or [`stream_shutdown()`] are called). + /// + /// [`Done`]: enum.Error.html#variant.Done + /// [`recv()`]: struct.Connection.html#method.recv + /// [`on_timeout()`]: struct.Connection.html#method.on_timeout + /// [`stream_send()`]: struct.Connection.html#method.stream_send + /// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown + /// + /// ## Examples: + /// + /// ```no_run + /// # let mut out = [0; 512]; + /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap(); + /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?; + /// # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]); + /// # let mut conn = quiche::accept(&scid, None, &mut config)?; + /// loop { + /// let write = match conn.send(&mut out) { + /// Ok(v) => v, + /// + /// Err(quiche::Error::Done) => { + /// // Done writing. + /// break; + /// }, + /// + /// Err(e) => { + /// // An error occurred, handle it. + /// break; + /// }, + /// }; + /// + /// socket.send(&out[..write]).unwrap(); + /// } + /// # Ok::<(), quiche::Error>(()) + /// ``` + pub fn send(&mut self, out: &mut [u8]) -> Result { + let (written, _) = self.send_with_info(out)?; + Ok(written) + } + // Returns the maximum size of a packet to be sent. // // This is a minimum of the sender's and the receiver's maximum UDP payload @@ -4484,6 +4554,12 @@ fn drop_pkt_on_err( Error::Done } +/// Info send out on every send_with_info call. +pub struct SendInfo { + /// Time to send the packet out. + pub send_time: time::Instant, +} + /// Statistics about the connection. /// /// A connections's statistics can be collected using the [`stats()`] method. @@ -8333,6 +8409,8 @@ mod tests { let mut pipe = testing::Pipe::default().unwrap(); + let mut now = time::Instant::now(); + // Client sends padded Initial. let len = pipe.client.send(&mut buf).unwrap(); assert_eq!(len, 1200); @@ -8344,13 +8422,15 @@ mod tests { testing::process_flight(&mut pipe.client, flight).unwrap(); // Client sends Initial packet with ACK. - let (ty, len) = pipe.client.send_single(&mut buf).unwrap(); + let (ty, len) = pipe.client.send_single(&mut buf, now).unwrap(); assert_eq!(ty, Type::Initial); assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len)); + now = time::Instant::now(); + // Client sends Handshake packet. - let (ty, len) = pipe.client.send_single(&mut buf).unwrap(); + let (ty, len) = pipe.client.send_single(&mut buf, now).unwrap(); assert_eq!(ty, Type::Handshake); // Packet type is corrupted to Initial. diff --git a/src/recovery/cubic.rs b/src/recovery/cubic.rs index 495302e5cb..cf6a4ba657 100644 --- a/src/recovery/cubic.rs +++ b/src/recovery/cubic.rs @@ -49,6 +49,7 @@ pub static CUBIC: CongestionControlOps = CongestionControlOps { on_packet_acked, congestion_event, collapse_cwnd, + has_custom_pacing, }; /// CUBIC Constants. @@ -312,6 +313,10 @@ fn congestion_event( } } +fn has_custom_pacing() -> bool { + false +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/recovery/mod.rs b/src/recovery/mod.rs index 92304c5f75..8b9b284d5d 100644 --- a/src/recovery/mod.rs +++ b/src/recovery/mod.rs @@ -121,6 +121,8 @@ pub struct Recovery { bytes_acked_ca: usize, + bytes_sent: usize, + congestion_recovery_start_time: Option, max_datagram_size: usize, @@ -129,6 +131,11 @@ pub struct Recovery { // HyStart++. hystart: hystart::Hystart, + + // Pacing + pacing_rate: u64, + + last_packet_scheduled_time: Option, } impl Recovery { @@ -185,6 +192,8 @@ impl Recovery { bytes_acked_ca: 0, + bytes_sent: 0, + congestion_recovery_start_time: None, max_datagram_size: config.max_send_udp_payload_size, @@ -198,6 +207,10 @@ impl Recovery { app_limited: false, hystart: hystart::Hystart::new(config.hystart), + + pacing_rate: 0, + + last_packet_scheduled_time: None, } } @@ -240,6 +253,18 @@ impl Recovery { self.hystart.start_round(pkt_num); } + // Pacing: Set the pacing rate if BBR is not used + if !(self.cc_ops.has_custom_pacing)() { + if let Some(srtt) = self.smoothed_rtt { + let rate = (self.congestion_window as u64 * 1000000) / + srtt.as_micros() as u64; + self.set_pacing_rate(rate); + } + } + + self.schedule_next_packet(epoch, now, sent_bytes); + + self.bytes_sent += sent_bytes; trace!("{} {:?}", trace_id, self); } @@ -247,6 +272,45 @@ impl Recovery { (self.cc_ops.on_packet_sent)(self, sent_bytes, now); } + pub fn set_pacing_rate(&mut self, rate: u64) { + if rate != 0 { + self.pacing_rate = rate; + } + } + + pub fn get_packet_send_time(&self) -> Option { + self.last_packet_scheduled_time + } + + fn schedule_next_packet( + &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize, + ) { + // Don't pace in any of these cases: + // * Packet epoch is not EPOCH_APPLICATION. + // * Packet contains only ACK frames. + // * The start of the connection. + if epoch != packet::EPOCH_APPLICATION || + packet_size == 0 || + self.bytes_sent <= self.congestion_window || + self.pacing_rate == 0 + { + self.last_packet_scheduled_time = Some(now); + return; + } + + self.last_packet_scheduled_time = match self.last_packet_scheduled_time { + Some(last_scheduled_time) => { + let interval: u64 = + (packet_size as u64 * 1000000) / self.pacing_rate; + let interval = Duration::from_micros(interval); + let next_schedule_time = last_scheduled_time + interval; + Some(cmp::max(now, next_schedule_time)) + }, + + None => Some(now), + }; + } + pub fn on_ack_received( &mut self, ranges: &ranges::RangeSet, ack_delay: u64, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, @@ -839,6 +903,8 @@ pub struct CongestionControlOps { ), pub collapse_cwnd: fn(r: &mut Recovery), + + pub has_custom_pacing: fn() -> bool, } impl From for &'static CongestionControlOps { @@ -885,6 +951,12 @@ impl std::fmt::Debug for Recovery { self.congestion_recovery_start_time )?; write!(f, "{:?} ", self.delivery_rate)?; + write!(f, "pacing_rate={:?}", self.pacing_rate)?; + write!( + f, + "last_packet_scheduled_time={:?}", + self.last_packet_scheduled_time + )?; if self.hystart.enabled() { write!(f, "hystart={:?} ", self.hystart)?; @@ -1548,6 +1620,142 @@ mod tests { // Spurious loss. assert_eq!(r.lost_count, 1); } + + #[test] + fn test_packet_pacing() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); + + let mut r = Recovery::new(&cfg); + + let mut now = Instant::now(); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0); + + // send out first packet. + let p = Sent { + pkt_num: 0, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6500, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + recent_delivered_packet_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1); + assert_eq!(r.bytes_in_flight, 6500); + + // First packet will be sent out immidiately. + assert_eq!(r.pacing_rate, 0); + assert_eq!(r.get_packet_send_time().unwrap(), now); + + // Wait 50ms for ACK. + now += Duration::from_millis(50); + + let mut acked = ranges::RangeSet::default(); + acked.insert(0..1); + + assert_eq!( + r.on_ack_received( + &acked, + 10, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "" + ), + Ok(()) + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0); + assert_eq!(r.bytes_in_flight, 0); + assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50)); + + // Send out second packet. + let p = Sent { + pkt_num: 1, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6500, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + recent_delivered_packet_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1); + assert_eq!(r.bytes_in_flight, 6500); + + // Pacing is not done during intial phase of connection. + assert_eq!(r.get_packet_send_time().unwrap(), now); + + // Send the third packet out. + let p = Sent { + pkt_num: 2, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6500, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + recent_delivered_packet_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2); + assert_eq!(r.bytes_in_flight, 13000); + assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50)); + + // We pace this outgoing packet. as all conditions for pacing + // are passed. + assert_eq!(r.pacing_rate, (12000.0 / 0.05) as u64); + assert_eq!( + r.get_packet_send_time().unwrap(), + now + Duration::from_micros( + (6500 * 1000000) / (12000.0 / 0.05) as u64 + ) + ); + } } mod cubic; diff --git a/src/recovery/reno.rs b/src/recovery/reno.rs index f7397e4157..bbb492eca5 100644 --- a/src/recovery/reno.rs +++ b/src/recovery/reno.rs @@ -43,6 +43,7 @@ pub static RENO: CongestionControlOps = CongestionControlOps { on_packet_acked, congestion_event, collapse_cwnd, + has_custom_pacing, }; pub fn on_packet_sent(r: &mut Recovery, sent_bytes: usize, _now: Instant) { @@ -156,6 +157,10 @@ pub fn collapse_cwnd(r: &mut Recovery) { r.bytes_acked_ca = 0; } +fn has_custom_pacing() -> bool { + false +} + #[cfg(test)] mod tests { use super::*;