diff --git a/include/quiche.h b/include/quiche.h index 2f316242f8..b3dbd37f39 100644 --- a/include/quiche.h +++ b/include/quiche.h @@ -273,8 +273,12 @@ ssize_t quiche_conn_recv(quiche_conn *conn, uint8_t *buf, size_t buf_len, const quiche_recv_info *info); typedef struct { + // The address the packet should be sent to. struct sockaddr_storage to; socklen_t to_len; + + // The time to send the packet out. + struct timespec at; } quiche_send_info; // Writes a single QUIC packet to be sent to the peer. diff --git a/src/ffi.rs b/src/ffi.rs index 04965be951..b2476562fa 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -40,6 +40,7 @@ use libc::c_void; use libc::size_t; use libc::sockaddr; use libc::ssize_t; +use libc::timespec; #[cfg(not(windows))] use libc::sockaddr_in; @@ -634,6 +635,8 @@ pub extern fn quiche_conn_recv( pub struct SendInfo { to: sockaddr_storage, to_len: socklen_t, + + at: timespec, } #[no_mangle] @@ -650,6 +653,8 @@ pub extern fn quiche_conn_send( Ok((v, info)) => { out_info.to_len = std_addr_to_c(&info.to, &mut out_info.to); + std_time_to_c(&info.at, &mut out_info.at); + v as ssize_t }, @@ -1092,3 +1097,17 @@ fn std_addr_to_c(addr: &SocketAddr, out: &mut sockaddr_storage) -> socklen_t { } } } + +#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "windows")))] +fn std_time_to_c(time: &std::time::Instant, out: &mut timespec) { + unsafe { + ptr::copy_nonoverlapping(time as *const _ as *const timespec, out, 1) + } +} + +#[cfg(any(target_os = "macos", target_os = "ios", target_os = "windows"))] +fn std_time_to_c(_time: &std::time::Instant, out: &mut timespec) { + // TODO: implement Instant conversion for systems that don't use timespec. + out.tv_sec = 0; + out.tv_nsec = 0; +} diff --git a/src/lib.rs b/src/lib.rs index c6622a7fd4..8f4df8e0b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -490,6 +490,9 @@ pub struct RecvInfo { pub struct SendInfo { /// The address the packet should be sent to. pub to: SocketAddr, + + /// The time to send the packet out. + pub at: time::Instant, } /// Represents information carried by `CONNECTION_CLOSE` frames. @@ -2390,7 +2393,14 @@ impl Connection { done += pad_len; } - let info = SendInfo { to: self.peer_addr }; + let info = SendInfo { + to: self.peer_addr, + + at: self + .recovery + .get_packet_send_time() + .unwrap_or_else(time::Instant::now), + }; Ok((done, info)) } diff --git a/src/recovery/cubic.rs b/src/recovery/cubic.rs index f3d58aba29..f3b6322b5e 100644 --- a/src/recovery/cubic.rs +++ b/src/recovery/cubic.rs @@ -51,6 +51,7 @@ pub static CUBIC: CongestionControlOps = CongestionControlOps { collapse_cwnd, checkpoint, rollback, + has_custom_pacing, }; /// CUBIC Constants. @@ -404,6 +405,10 @@ fn rollback(r: &mut Recovery) { r.congestion_recovery_start_time = r.cubic_state.prior.epoch_start; } +fn has_custom_pacing() -> bool { + false +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/recovery/mod.rs b/src/recovery/mod.rs index 6043f32768..5cb1a5ae01 100644 --- a/src/recovery/mod.rs +++ b/src/recovery/mod.rs @@ -121,6 +121,8 @@ pub struct Recovery { bytes_acked_ca: usize, + bytes_sent: usize, + congestion_recovery_start_time: Option, max_datagram_size: usize, @@ -129,6 +131,11 @@ pub struct Recovery { // HyStart++. hystart: hystart::Hystart, + + // Pacing. + pacing_rate: u64, + + last_packet_scheduled_time: Option, } impl Recovery { @@ -185,6 +192,8 @@ impl Recovery { bytes_acked_ca: 0, + bytes_sent: 0, + congestion_recovery_start_time: None, max_datagram_size: config.max_send_udp_payload_size, @@ -198,6 +207,10 @@ impl Recovery { app_limited: false, hystart: hystart::Hystart::new(config.hystart), + + pacing_rate: 0, + + last_packet_scheduled_time: None, } } @@ -240,6 +253,18 @@ impl Recovery { self.hystart.start_round(pkt_num); } + // Pacing: Set the pacing rate if CC doesn't do its own. + if !(self.cc_ops.has_custom_pacing)() { + if let Some(srtt) = self.smoothed_rtt { + let rate = (self.congestion_window as u64 * 1000000) / + srtt.as_micros() as u64; + self.set_pacing_rate(rate); + } + } + + self.schedule_next_packet(epoch, now, sent_bytes); + + self.bytes_sent += sent_bytes; trace!("{} {:?}", trace_id, self); } @@ -247,6 +272,45 @@ impl Recovery { (self.cc_ops.on_packet_sent)(self, sent_bytes, now); } + pub fn set_pacing_rate(&mut self, rate: u64) { + if rate != 0 { + self.pacing_rate = rate; + } + } + + pub fn get_packet_send_time(&self) -> Option { + self.last_packet_scheduled_time + } + + fn schedule_next_packet( + &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize, + ) { + // Don't pace in any of these cases: + // * Packet epoch is not EPOCH_APPLICATION. + // * Packet contains only ACK frames. + // * The start of the connection. + if epoch != packet::EPOCH_APPLICATION || + packet_size == 0 || + self.bytes_sent <= self.congestion_window || + self.pacing_rate == 0 + { + self.last_packet_scheduled_time = Some(now); + return; + } + + self.last_packet_scheduled_time = match self.last_packet_scheduled_time { + Some(last_scheduled_time) => { + let interval: u64 = + (packet_size as u64 * 1000000) / self.pacing_rate; + let interval = Duration::from_micros(interval); + let next_schedule_time = last_scheduled_time + interval; + Some(cmp::max(now, next_schedule_time)) + }, + + None => Some(now), + }; + } + pub fn on_ack_received( &mut self, ranges: &ranges::RangeSet, ack_delay: u64, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, @@ -847,6 +911,8 @@ pub struct CongestionControlOps { pub checkpoint: fn(r: &mut Recovery), pub rollback: fn(r: &mut Recovery), + + pub has_custom_pacing: fn() -> bool, } impl From for &'static CongestionControlOps { @@ -893,6 +959,12 @@ impl std::fmt::Debug for Recovery { self.congestion_recovery_start_time )?; write!(f, "{:?} ", self.delivery_rate)?; + write!(f, "pacing_rate={:?}", self.pacing_rate)?; + write!( + f, + "last_packet_scheduled_time={:?}", + self.last_packet_scheduled_time + )?; if self.hystart.enabled() { write!(f, "hystart={:?} ", self.hystart)?; @@ -1556,6 +1628,142 @@ mod tests { // Spurious loss. assert_eq!(r.lost_count, 1); } + + #[test] + fn pacing() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); + + let mut r = Recovery::new(&cfg); + + let mut now = Instant::now(); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0); + + // send out first packet. + let p = Sent { + pkt_num: 0, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6500, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + recent_delivered_packet_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1); + assert_eq!(r.bytes_in_flight, 6500); + + // First packet will be sent out immidiately. + assert_eq!(r.pacing_rate, 0); + assert_eq!(r.get_packet_send_time().unwrap(), now); + + // Wait 50ms for ACK. + now += Duration::from_millis(50); + + let mut acked = ranges::RangeSet::default(); + acked.insert(0..1); + + assert_eq!( + r.on_ack_received( + &acked, + 10, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "" + ), + Ok(()) + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0); + assert_eq!(r.bytes_in_flight, 0); + assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50)); + + // Send out second packet. + let p = Sent { + pkt_num: 1, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6500, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + recent_delivered_packet_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1); + assert_eq!(r.bytes_in_flight, 6500); + + // Pacing is not done during intial phase of connection. + assert_eq!(r.get_packet_send_time().unwrap(), now); + + // Send the third packet out. + let p = Sent { + pkt_num: 2, + frames: vec![], + time_sent: now, + time_acked: None, + time_lost: None, + size: 6500, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: now, + recent_delivered_packet_sent_time: now, + is_app_limited: false, + has_data: false, + }; + + r.on_packet_sent( + p, + packet::EPOCH_APPLICATION, + HandshakeStatus::default(), + now, + "", + ); + + assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2); + assert_eq!(r.bytes_in_flight, 13000); + assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50)); + + // We pace this outgoing packet. as all conditions for pacing + // are passed. + assert_eq!(r.pacing_rate, (12000.0 / 0.05) as u64); + assert_eq!( + r.get_packet_send_time().unwrap(), + now + Duration::from_micros( + (6500 * 1000000) / (12000.0 / 0.05) as u64 + ) + ); + } } mod cubic; diff --git a/src/recovery/reno.rs b/src/recovery/reno.rs index 98b5ec42ad..95ce58c739 100644 --- a/src/recovery/reno.rs +++ b/src/recovery/reno.rs @@ -45,6 +45,7 @@ pub static RENO: CongestionControlOps = CongestionControlOps { collapse_cwnd, checkpoint, rollback, + has_custom_pacing, }; pub fn on_packet_sent(r: &mut Recovery, sent_bytes: usize, _now: Instant) { @@ -162,6 +163,10 @@ fn checkpoint(_r: &mut Recovery) {} fn rollback(_r: &mut Recovery) {} +fn has_custom_pacing() -> bool { + false +} + #[cfg(test)] mod tests { use super::*;