Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pacing: Pacing of egress QUIC packets #770

Merged
merged 1 commit into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/quiche.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,12 @@ ssize_t quiche_conn_recv(quiche_conn *conn, uint8_t *buf, size_t buf_len,
const quiche_recv_info *info);

typedef struct {
// The address the packet should be sent to.
struct sockaddr_storage to;
socklen_t to_len;

// The time to send the packet out.
struct timespec at;
} quiche_send_info;
lohith-bellad marked this conversation as resolved.
Show resolved Hide resolved

// Writes a single QUIC packet to be sent to the peer.
Expand Down
19 changes: 19 additions & 0 deletions src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use libc::c_void;
use libc::size_t;
use libc::sockaddr;
use libc::ssize_t;
use libc::timespec;

#[cfg(not(windows))]
use libc::sockaddr_in;
Expand Down Expand Up @@ -634,6 +635,8 @@ pub extern fn quiche_conn_recv(
pub struct SendInfo {
to: sockaddr_storage,
to_len: socklen_t,

at: timespec,
}

#[no_mangle]
Expand All @@ -650,6 +653,8 @@ pub extern fn quiche_conn_send(
Ok((v, info)) => {
out_info.to_len = std_addr_to_c(&info.to, &mut out_info.to);

std_time_to_c(&info.at, &mut out_info.at);

v as ssize_t
},

Expand Down Expand Up @@ -1092,3 +1097,17 @@ fn std_addr_to_c(addr: &SocketAddr, out: &mut sockaddr_storage) -> socklen_t {
}
}
}

#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "windows")))]
fn std_time_to_c(time: &std::time::Instant, out: &mut timespec) {
unsafe {
ptr::copy_nonoverlapping(time as *const _ as *const timespec, out, 1)
}
}

#[cfg(any(target_os = "macos", target_os = "ios", target_os = "windows"))]
fn std_time_to_c(_time: &std::time::Instant, out: &mut timespec) {
// TODO: implement Instant conversion for systems that don't use timespec.
out.tv_sec = 0;
out.tv_nsec = 0;
}
12 changes: 11 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,9 @@ pub struct RecvInfo {
pub struct SendInfo {
/// The address the packet should be sent to.
pub to: SocketAddr,

/// The time to send the packet out.
pub at: time::Instant,
}

/// Represents information carried by `CONNECTION_CLOSE` frames.
Expand Down Expand Up @@ -2390,7 +2393,14 @@ impl Connection {
done += pad_len;
}

let info = SendInfo { to: self.peer_addr };
let info = SendInfo {
to: self.peer_addr,

at: self
.recovery
.get_packet_send_time()
.unwrap_or_else(time::Instant::now),
};

Ok((done, info))
}
Expand Down
5 changes: 5 additions & 0 deletions src/recovery/cubic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub static CUBIC: CongestionControlOps = CongestionControlOps {
collapse_cwnd,
checkpoint,
rollback,
has_custom_pacing,
};

/// CUBIC Constants.
Expand Down Expand Up @@ -404,6 +405,10 @@ fn rollback(r: &mut Recovery) {
r.congestion_recovery_start_time = r.cubic_state.prior.epoch_start;
}

fn has_custom_pacing() -> bool {
false
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
208 changes: 208 additions & 0 deletions src/recovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ pub struct Recovery {

bytes_acked_ca: usize,

bytes_sent: usize,

congestion_recovery_start_time: Option<Instant>,

max_datagram_size: usize,
Expand All @@ -129,6 +131,11 @@ pub struct Recovery {

// HyStart++.
hystart: hystart::Hystart,

// Pacing.
pacing_rate: u64,

last_packet_scheduled_time: Option<Instant>,
}

impl Recovery {
Expand Down Expand Up @@ -185,6 +192,8 @@ impl Recovery {

bytes_acked_ca: 0,

bytes_sent: 0,

congestion_recovery_start_time: None,

max_datagram_size: config.max_send_udp_payload_size,
Expand All @@ -198,6 +207,10 @@ impl Recovery {
app_limited: false,

hystart: hystart::Hystart::new(config.hystart),

pacing_rate: 0,

last_packet_scheduled_time: None,
}
}

Expand Down Expand Up @@ -240,13 +253,64 @@ impl Recovery {
self.hystart.start_round(pkt_num);
}

// Pacing: Set the pacing rate if CC doesn't do its own.
if !(self.cc_ops.has_custom_pacing)() {
if let Some(srtt) = self.smoothed_rtt {
let rate = (self.congestion_window as u64 * 1000000) /
srtt.as_micros() as u64;
self.set_pacing_rate(rate);
}
}

self.schedule_next_packet(epoch, now, sent_bytes);

self.bytes_sent += sent_bytes;
trace!("{} {:?}", trace_id, self);
}

fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
(self.cc_ops.on_packet_sent)(self, sent_bytes, now);
}

pub fn set_pacing_rate(&mut self, rate: u64) {
if rate != 0 {
self.pacing_rate = rate;
}
}

pub fn get_packet_send_time(&self) -> Option<Instant> {
self.last_packet_scheduled_time
}

fn schedule_next_packet(
&mut self, epoch: packet::Epoch, now: Instant, packet_size: usize,
) {
// Don't pace in any of these cases:
// * Packet epoch is not EPOCH_APPLICATION.
// * Packet contains only ACK frames.
// * The start of the connection.
if epoch != packet::EPOCH_APPLICATION ||
packet_size == 0 ||
self.bytes_sent <= self.congestion_window ||
self.pacing_rate == 0
{
self.last_packet_scheduled_time = Some(now);
return;
}

self.last_packet_scheduled_time = match self.last_packet_scheduled_time {
Some(last_scheduled_time) => {
let interval: u64 =
(packet_size as u64 * 1000000) / self.pacing_rate;
let interval = Duration::from_micros(interval);
let next_schedule_time = last_scheduled_time + interval;
Some(cmp::max(now, next_schedule_time))
},

None => Some(now),
};
}

pub fn on_ack_received(
&mut self, ranges: &ranges::RangeSet, ack_delay: u64,
epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
Expand Down Expand Up @@ -847,6 +911,8 @@ pub struct CongestionControlOps {
pub checkpoint: fn(r: &mut Recovery),

pub rollback: fn(r: &mut Recovery),

pub has_custom_pacing: fn() -> bool,
}

impl From<CongestionControlAlgorithm> for &'static CongestionControlOps {
Expand Down Expand Up @@ -893,6 +959,12 @@ impl std::fmt::Debug for Recovery {
self.congestion_recovery_start_time
)?;
write!(f, "{:?} ", self.delivery_rate)?;
write!(f, "pacing_rate={:?}", self.pacing_rate)?;
write!(
f,
"last_packet_scheduled_time={:?}",
self.last_packet_scheduled_time
)?;

if self.hystart.enabled() {
write!(f, "hystart={:?} ", self.hystart)?;
Expand Down Expand Up @@ -1556,6 +1628,142 @@ mod tests {
// Spurious loss.
assert_eq!(r.lost_count, 1);
}

#[test]
fn pacing() {
let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC);

let mut r = Recovery::new(&cfg);

let mut now = Instant::now();

assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);

// send out first packet.
let p = Sent {
pkt_num: 0,
frames: vec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 6500,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
recent_delivered_packet_sent_time: now,
is_app_limited: false,
has_data: false,
};

r.on_packet_sent(
p,
packet::EPOCH_APPLICATION,
HandshakeStatus::default(),
now,
"",
);

assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
assert_eq!(r.bytes_in_flight, 6500);

// First packet will be sent out immidiately.
assert_eq!(r.pacing_rate, 0);
assert_eq!(r.get_packet_send_time().unwrap(), now);

// Wait 50ms for ACK.
now += Duration::from_millis(50);

let mut acked = ranges::RangeSet::default();
acked.insert(0..1);

assert_eq!(
r.on_ack_received(
&acked,
10,
packet::EPOCH_APPLICATION,
HandshakeStatus::default(),
now,
""
),
Ok(())
);

assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
assert_eq!(r.bytes_in_flight, 0);
assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));

// Send out second packet.
let p = Sent {
pkt_num: 1,
frames: vec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 6500,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
recent_delivered_packet_sent_time: now,
is_app_limited: false,
has_data: false,
};

r.on_packet_sent(
p,
packet::EPOCH_APPLICATION,
HandshakeStatus::default(),
now,
"",
);

assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
assert_eq!(r.bytes_in_flight, 6500);

// Pacing is not done during intial phase of connection.
assert_eq!(r.get_packet_send_time().unwrap(), now);

// Send the third packet out.
let p = Sent {
pkt_num: 2,
frames: vec![],
time_sent: now,
time_acked: None,
time_lost: None,
size: 6500,
ack_eliciting: true,
in_flight: true,
delivered: 0,
delivered_time: now,
recent_delivered_packet_sent_time: now,
is_app_limited: false,
has_data: false,
};

r.on_packet_sent(
p,
packet::EPOCH_APPLICATION,
HandshakeStatus::default(),
now,
"",
);

assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
assert_eq!(r.bytes_in_flight, 13000);
assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));

// We pace this outgoing packet. as all conditions for pacing
// are passed.
assert_eq!(r.pacing_rate, (12000.0 / 0.05) as u64);
assert_eq!(
r.get_packet_send_time().unwrap(),
now + Duration::from_micros(
(6500 * 1000000) / (12000.0 / 0.05) as u64
)
);
}
}

mod cubic;
Expand Down
5 changes: 5 additions & 0 deletions src/recovery/reno.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub static RENO: CongestionControlOps = CongestionControlOps {
collapse_cwnd,
checkpoint,
rollback,
has_custom_pacing,
};

pub fn on_packet_sent(r: &mut Recovery, sent_bytes: usize, _now: Instant) {
Expand Down Expand Up @@ -162,6 +163,10 @@ fn checkpoint(_r: &mut Recovery) {}

fn rollback(_r: &mut Recovery) {}

fn has_custom_pacing() -> bool {
false
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down