Skip to content

Commit

Permalink
Implement goog_cc inspired LossController
Browse files Browse the repository at this point in the history
Ports relevant parts for the V2 loss controller from libWebRTC. This is
based on an older commit to align with the current delay implementation.

Co-Authored-By: Hugo Tunius <h@tunius.se>
  • Loading branch information
davibe and k0nserv committed Nov 5, 2024
1 parent 8e3c5ca commit 87df756
Show file tree
Hide file tree
Showing 12 changed files with 1,878 additions and 169 deletions.
43 changes: 38 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ mod packet;

#[path = "rtp/mod.rs"]
mod rtp_;
use rtp_::Bitrate;
use rtp_::{Bitrate, DataSize};
use rtp_::{Extension, ExtensionMap};

/// Low level RTP access.
Expand Down Expand Up @@ -1830,7 +1830,7 @@ pub struct RtcConfig {
exts: ExtensionMap,
stats_interval: Option<Duration>,
/// Whether to use Bandwidth Estimation to discover the egress bandwidth.
bwe_initial_bitrate: Option<Bitrate>,
bwe_config: Option<BweConfig>,
reordering_size_audio: usize,
reordering_size_video: usize,
send_buffer_audio: usize,
Expand All @@ -1839,6 +1839,12 @@ pub struct RtcConfig {
enable_raw_packets: bool,
}

#[derive(Debug, Clone)]
struct BweConfig {
initial_bitrate: Bitrate,
enable_loss_controller: bool,
}

impl RtcConfig {
/// Creates a new default config.
pub fn new() -> Self {
Expand Down Expand Up @@ -2079,7 +2085,25 @@ impl RtcConfig {
///
/// This includes setting the initial estimate to start with.
pub fn enable_bwe(mut self, initial_estimate: Option<Bitrate>) -> Self {
self.bwe_initial_bitrate = initial_estimate;
match initial_estimate {
Some(b) => {
let conf = self.bwe_config.get_or_insert(BweConfig::new(b));
conf.initial_bitrate = b;
}
None => {
self.bwe_config = None;
}
}

self
}

/// Enable the experimental loss based BWE subsystem.
/// Defaults to disabled for now, will be enabled by default in the future.
pub fn enable_experimental_loss_based_bwe(mut self, enabled: bool) -> Self {
if let Some(c) = &mut self.bwe_config {
c.enable_loss_controller = enabled;
}

self
}
Expand All @@ -2094,7 +2118,7 @@ impl RtcConfig {
/// assert_eq!(config.bwe_initial_bitrate(), None);
/// ```
pub fn bwe_initial_bitrate(&self) -> Option<Bitrate> {
self.bwe_initial_bitrate
self.bwe_config.as_ref().map(|c| c.initial_bitrate)
}

/// Sets the number of packets held back for reordering audio packets.
Expand Down Expand Up @@ -2265,6 +2289,15 @@ impl RtcConfig {
}
}

impl BweConfig {
fn new(initial_bitrate: Bitrate) -> Self {
Self {
initial_bitrate,
enable_loss_controller: false,
}
}
}

impl Default for RtcConfig {
fn default() -> Self {
Self {
Expand All @@ -2275,7 +2308,7 @@ impl Default for RtcConfig {
codec_config: CodecConfig::new_with_defaults(),
exts: ExtensionMap::standard(),
stats_interval: None,
bwe_initial_bitrate: None,
bwe_config: None,
reordering_size_audio: 15,
reordering_size_video: 30,
send_buffer_audio: 50,
Expand Down
16 changes: 16 additions & 0 deletions src/packet/bwe/arrival_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ mod test {
size: DataSize::ZERO,
local_send_time: now,
remote_recv_time: now + duration_us(10),
local_recv_time: now + duration_us(12),
}),
Belongs::Yes,
"Any packet should belong to an empty arrival group"
Expand All @@ -249,27 +250,31 @@ mod test {
size: DataSize::ZERO,
local_send_time: now,
remote_recv_time: now + duration_us(150),
local_recv_time: now + duration_us(200),
});

packets.push(AckedPacket {
seq_no: 1.into(),
size: DataSize::ZERO,
local_send_time: now + duration_us(50),
remote_recv_time: now + duration_us(225),
local_recv_time: now + duration_us(275),
});

packets.push(AckedPacket {
seq_no: 2.into(),
size: DataSize::ZERO,
local_send_time: now + duration_us(1005),
remote_recv_time: now + duration_us(1140),
local_recv_time: now + duration_us(1190),
});

packets.push(AckedPacket {
seq_no: 3.into(),
size: DataSize::ZERO,
local_send_time: now + duration_us(4995),
remote_recv_time: now + duration_us(5001),
local_recv_time: now + duration_us(5051),
});

// Should not belong
Expand All @@ -278,6 +283,7 @@ mod test {
size: DataSize::ZERO,
local_send_time: now + duration_us(5700),
remote_recv_time: now + duration_us(6000),
local_recv_time: now + duration_us(5750),
});

packets
Expand Down Expand Up @@ -307,27 +313,31 @@ mod test {
size: DataSize::ZERO,
local_send_time: now,
remote_recv_time: now + duration_us(150),
local_recv_time: now + duration_us(200),
});

packets.push(AckedPacket {
seq_no: 1.into(),
size: DataSize::ZERO,
local_send_time: now + duration_us(50),
remote_recv_time: now + duration_us(225),
local_recv_time: now + duration_us(275),
});

packets.push(AckedPacket {
seq_no: 2.into(),
size: DataSize::ZERO,
local_send_time: now + duration_us(1005),
remote_recv_time: now + duration_us(1140),
local_recv_time: now + duration_us(1190),
});

packets.push(AckedPacket {
seq_no: 3.into(),
size: DataSize::ZERO,
local_send_time: now + duration_us(4995),
remote_recv_time: now + duration_us(5001),
local_recv_time: now + duration_us(5051),
});

// Should be skipped
Expand All @@ -336,6 +346,7 @@ mod test {
size: DataSize::ZERO,
local_send_time: now + duration_us(5001),
remote_recv_time: now + duration_us(5000),
local_recv_time: now + duration_us(5050),
});

// Should not belong
Expand All @@ -344,6 +355,7 @@ mod test {
size: DataSize::ZERO,
local_send_time: now + duration_us(5700),
remote_recv_time: now + duration_us(6000),
local_recv_time: now + duration_us(6050),
});

packets
Expand Down Expand Up @@ -373,13 +385,15 @@ mod test {
size: DataSize::ZERO,
local_send_time: now,
remote_recv_time: now + duration_us(150),
local_recv_time: now + duration_us(200),
});

packets.push(AckedPacket {
seq_no: 1.into(),
size: DataSize::ZERO,
local_send_time: now + duration_us(50),
remote_recv_time: now + duration_us(225),
local_recv_time: now + duration_us(275),
});

packets.push(AckedPacket {
Expand All @@ -388,6 +402,7 @@ mod test {
local_send_time: now + duration_us(5152),
// Just less than 5ms inter arrival delta
remote_recv_time: now + duration_us(5224),
local_recv_time: now + duration_us(5274),
});

// Should not belong
Expand All @@ -396,6 +411,7 @@ mod test {
size: DataSize::ZERO,
local_send_time: now + duration_us(5700),
remote_recv_time: now + duration_us(6000),
local_recv_time: now + duration_us(6050),
});

packets
Expand Down
166 changes: 166 additions & 0 deletions src/packet/bwe/delay_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
use std::collections::VecDeque;
use std::time::{Duration, Instant};

use crate::rtp_::Bitrate;
use crate::util::already_happened;

use super::arrival_group::ArrivalGroupAccumulator;
use super::rate_control::RateControl;
use super::trendline_estimator::TrendlineEstimator;
use super::{AckedPacket, BandwidthUsage};

const MAX_RTT_HISTORY_WINDOW: usize = 32;
const UPDATE_INTERVAL: Duration = Duration::from_millis(25);
/// The maximum time we keep updating our estimate without receiving a TWCC report.
const MAX_TWCC_GAP: Duration = Duration::from_millis(500);

/// Delay controller for googcc inspired BWE.
///
/// This controller attempts to estimate the available send bandwidth by looking at the variations
/// in packet arrival times for groups of packets sent together. Broadly, if the delay variation is
/// increasing this indicates overuse.
pub struct DelayController {
arrival_group_accumulator: ArrivalGroupAccumulator,
trendline_estimator: TrendlineEstimator,
rate_control: RateControl,
/// Last estimate produced, unlike [`next_estimate`] this will always have a value after the
/// first estimate.
last_estimate: Option<Bitrate>,
/// History of the max RTT derived for each TWCC report.
max_rtt_history: VecDeque<Duration>,
/// Calculated mean of max_rtt_history.
mean_max_rtt: Option<Duration>,

/// The next time we should poll.
next_timeout: Instant,
/// The last time we ingested a TWCC report.
last_twcc_report: Instant,
}

impl DelayController {
pub fn new(initial_bitrate: Bitrate) -> Self {
Self {
arrival_group_accumulator: ArrivalGroupAccumulator::default(),
trendline_estimator: TrendlineEstimator::new(20),
rate_control: RateControl::new(initial_bitrate, Bitrate::kbps(40), Bitrate::gbps(10)),
last_estimate: None,
max_rtt_history: VecDeque::default(),
mean_max_rtt: None,
next_timeout: already_happened(),
last_twcc_report: already_happened(),
}
}

/// Record a packet from a TWCC report.
pub(crate) fn update(
&mut self,
acked: &[AckedPacket],
acked_bitrate: Option<Bitrate>,
now: Instant,
) -> Option<Bitrate> {
let mut max_rtt = None;

for acked_packet in acked {
max_rtt = max_rtt.max(Some(acked_packet.rtt()));
if let Some(delay_variation) = self
.arrival_group_accumulator
.accumulate_packet(acked_packet)
{
crate::packet::bwe::macros::log_delay_variation!(delay_variation.delay_delta);

// Got a new delay variation, add it to the trendline
self.trendline_estimator
.add_delay_observation(delay_variation, now);
}
}

if let Some(rtt) = max_rtt {
self.add_max_rtt(rtt);
}

let new_hypothesis = self.trendline_estimator.hypothesis();

self.update_estimate(new_hypothesis, acked_bitrate, self.mean_max_rtt, now);
self.last_twcc_report = now;

self.last_estimate
}

pub(crate) fn poll_timeout(&self) -> Instant {
self.next_timeout
}

pub(crate) fn handle_timeout(&mut self, acked_bitrate: Option<Bitrate>, now: Instant) {
if !self.trendline_hypothesis_valid(now) {
// We haven't received a TWCC report in a while. The trendline hypothesis can
// no longer be considered valid. We need another TWCC report before we can update
// estimates.
let next_timeout_in = self
.mean_max_rtt
.unwrap_or(MAX_TWCC_GAP)
.min(UPDATE_INTERVAL);

// Set this even if we didn't update, otherwise we get stuck in a poll -> handle loop
// that starves the run loop.
self.next_timeout = now + next_timeout_in;
return;
}

self.update_estimate(
self.trendline_estimator.hypothesis(),
acked_bitrate,
self.mean_max_rtt,
now,
);
}

/// Get the latest estimate.
pub(crate) fn last_estimate(&self) -> Option<Bitrate> {
self.last_estimate
}

fn add_max_rtt(&mut self, max_rtt: Duration) {
while self.max_rtt_history.len() > MAX_RTT_HISTORY_WINDOW {
self.max_rtt_history.pop_front();
}
self.max_rtt_history.push_back(max_rtt);

let sum = self
.max_rtt_history
.iter()
.fold(Duration::ZERO, |acc, rtt| acc + *rtt);

self.mean_max_rtt = Some(sum / self.max_rtt_history.len() as u32);
}

fn update_estimate(
&mut self,
hypothesis: BandwidthUsage,
observed_bitrate: Option<Bitrate>,
mean_max_rtt: Option<Duration>,
now: Instant,
) {
if let Some(observed_bitrate) = observed_bitrate {
self.rate_control
.update(hypothesis.into(), observed_bitrate, mean_max_rtt, now);
let estimated_rate = self.rate_control.estimated_bitrate();

crate::packet::bwe::macros::log_bitrate_estimate!(estimated_rate.as_f64());
self.last_estimate = Some(estimated_rate);
}

// Set this even if we didn't update, otherwise we get stuck in a poll -> handle loop
// that starves the run loop.
self.next_timeout = now + UPDATE_INTERVAL;
}

/// Whether the current trendline hypothesis is valid i.e. not too old.
fn trendline_hypothesis_valid(&self, now: Instant) -> bool {
now.duration_since(self.last_twcc_report)
<= self
.mean_max_rtt
.map(|rtt| rtt * 2)
.unwrap_or(MAX_TWCC_GAP)
.min(UPDATE_INTERVAL * 2)
}
}
Loading

0 comments on commit 87df756

Please sign in to comment.