Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bitrate control with Twcc and Remb #265

Merged
merged 3 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
node: node.clone(),
media: MediaConfig { webrtc_addrs: webrtc_addrs.clone() },
};
controller.add_worker::<_, _, MediaRuntimeWorker, PollingBackend<_, 128, 512>>(Duration::from_millis(100), cfg, None);
controller.add_worker::<_, _, MediaRuntimeWorker, PollingBackend<_, 128, 512>>(Duration::from_millis(1), cfg, None);

Check warning on line 42 in bin/src/server/media.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/media.rs#L42

Added line #L42 was not covered by tests
}

let mut req_id_seed = 0;
Expand Down
19 changes: 15 additions & 4 deletions packages/media_core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{marker::PhantomData, time::Instant};

use media_server_protocol::{
endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta, TrackName},
endpoint::{BitrateControlMode, PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta, TrackName, TrackPriority},
media::MediaPacket,
transport::RpcResult,
};
Expand Down Expand Up @@ -31,7 +31,7 @@
pub enum EndpointRemoteTrackRes {}

pub enum EndpointLocalTrackReq {
Switch(Option<(PeerId, TrackName)>),
Switch(Option<(PeerId, TrackName, TrackPriority)>),
}

pub enum EndpointLocalTrackRes {
Expand Down Expand Up @@ -68,6 +68,7 @@
/// This is used for controlling the local track, which is sent from endpoint
pub enum EndpointLocalTrackEvent {
Media(MediaPacket),
DesiredBitrate(u64),
}

/// This is used for controlling the remote track, which is sent from endpoint
Expand All @@ -83,6 +84,11 @@
PeerTrackStopped(PeerId, TrackName),
RemoteMediaTrack(RemoteTrackId, EndpointRemoteTrackEvent),
LocalMediaTrack(LocalTrackId, EndpointLocalTrackEvent),
/// Egress est params
BweConfig {
current: u64,
desired: u64,
},
/// This session will be disconnect after some seconds
GoAway(u8, Option<String>),
}
Expand All @@ -109,6 +115,11 @@
Internal = 1,
}

pub struct EndpointCfg {
pub max_egress_bitrate: u32,
pub bitrate_control: BitrateControlMode,
}

pub struct Endpoint<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> {
transport: T,
internal: EndpointInternal,
Expand All @@ -117,10 +128,10 @@
}

impl<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> Endpoint<T, ExtIn, ExtOut> {
pub fn new(transport: T) -> Self {
pub fn new(cfg: EndpointCfg, transport: T) -> Self {

Check warning on line 131 in packages/media_core/src/endpoint.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint.rs#L131

Added line #L131 was not covered by tests
Self {
transport,
internal: EndpointInternal::new(),
internal: EndpointInternal::new(cfg),

Check warning on line 134 in packages/media_core/src/endpoint.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint.rs#L134

Added line #L134 was not covered by tests
Comment on lines +131 to +134
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor for Endpoint is not covered by unit tests. Consider adding tests to cover this critical initialization logic.

Would you like me to help by writing some unit tests for this method?

switcher: TaskSwitcher::new(2),
_tmp: PhantomData::default(),
}
Expand Down
51 changes: 46 additions & 5 deletions packages/media_core/src/endpoint/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
transport::{LocalTrackEvent, LocalTrackId, RemoteTrackEvent, RemoteTrackId, TransportEvent, TransportState, TransportStats},
};

use self::{local_track::EndpointLocalTrack, remote_track::EndpointRemoteTrack};
use self::{bitrate_allocator::BitrateAllocator, local_track::EndpointLocalTrack, remote_track::EndpointRemoteTrack};

use super::{middleware::EndpointMiddleware, EndpointEvent, EndpointReq, EndpointReqId, EndpointRes};
use super::{middleware::EndpointMiddleware, EndpointCfg, EndpointEvent, EndpointReq, EndpointReqId, EndpointRes};

mod bitrate_allocator;
mod local_track;
mod remote_track;

Expand All @@ -37,6 +38,7 @@
}

pub struct EndpointInternal {
cfg: EndpointCfg,
state: TransportState,
wait_join: Option<(RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe)>,
joined: Option<(ClusterRoomHash, RoomId, PeerId)>,
Expand All @@ -47,11 +49,13 @@
_middlewares: Vec<Box<dyn EndpointMiddleware>>,
queue: VecDeque<InternalOutput>,
switcher: TaskSwitcher,
bitrate_allocator: BitrateAllocator,
}

impl EndpointInternal {
pub fn new() -> Self {
pub fn new(cfg: EndpointCfg) -> Self {

Check warning on line 56 in packages/media_core/src/endpoint/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal.rs#L56

Added line #L56 was not covered by tests
Self {
cfg,

Check warning on line 58 in packages/media_core/src/endpoint/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal.rs#L58

Added line #L58 was not covered by tests
state: TransportState::Connecting,
wait_join: None,
joined: None,
Expand All @@ -62,10 +66,25 @@
_middlewares: Default::default(),
queue: Default::default(),
switcher: TaskSwitcher::new(2),
bitrate_allocator: BitrateAllocator::default(),

Check warning on line 69 in packages/media_core/src/endpoint/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal.rs#L69

Added line #L69 was not covered by tests
}
}

pub fn on_tick<'a>(&mut self, now: Instant) -> Option<InternalOutput> {
self.bitrate_allocator.on_tick();
if let Some(out) = self.bitrate_allocator.pop_output() {
match out {
bitrate_allocator::Output::SetTrackBitrate(track, bitrate) => {
if let Some(index) = self.local_tracks_id.get1(&track) {
let out = self.local_tracks.on_event(now, *index, local_track::Input::LimitBitrate(bitrate))?;
if let Some(out) = self.convert_local_track_output(now, track, out) {
return Some(out);
}
}

Check warning on line 83 in packages/media_core/src/endpoint/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal.rs#L74-L83

Added lines #L74 - L83 were not covered by tests
}
}
}

Check warning on line 86 in packages/media_core/src/endpoint/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal.rs#L86

Added line #L86 was not covered by tests
Comment on lines +74 to +86
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method on_tick modifies the state based on bitrate_allocator outputs but lacks unit tests covering these changes.

Consider adding tests to cover the scenarios where bitrate adjustments are made based on the allocator's output.


loop {
match self.switcher.looper_current(now)?.try_into().ok()? {
TaskType::LocalTracks => {
Expand Down Expand Up @@ -124,6 +143,12 @@
TransportEvent::RemoteTrack(track, event) => self.on_transport_remote_track(now, track, event),
TransportEvent::LocalTrack(track, event) => self.on_transport_local_track(now, track, event),
TransportEvent::Stats(stats) => self.on_transport_stats(now, stats),
TransportEvent::EgressBitrateEstimate(bitrate) => {
let bitrate2 = bitrate.min(self.cfg.max_egress_bitrate as u64);
log::debug!("[EndpointInternal] limit egress bitrate {bitrate2}, rewrite from {bitrate}");
self.bitrate_allocator.set_egress_bitrate(bitrate2);
None

Check warning on line 150 in packages/media_core/src/endpoint/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal.rs#L146-L150

Added lines #L146 - L150 were not covered by tests
Comment on lines +146 to +150
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handling of EgressBitrateEstimate in on_transport_event is not covered by tests, which is crucial for ensuring the bitrate control behaves as expected under different network conditions.

Would you like assistance in creating test cases for this event handling?

}
}
}

Expand Down Expand Up @@ -220,10 +245,10 @@
}

fn on_transport_local_track<'a>(&mut self, now: Instant, track: LocalTrackId, event: LocalTrackEvent) -> Option<InternalOutput> {
if event.need_create() {
if let Some(kind) = event.need_create() {

Check warning on line 248 in packages/media_core/src/endpoint/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal.rs#L248

Added line #L248 was not covered by tests
log::info!("[EndpointInternal] create local track {:?}", track);
let room = self.joined.as_ref().map(|j| j.0.clone());
let index = self.local_tracks.add_task(EndpointLocalTrack::new(room));
let index = self.local_tracks.add_task(EndpointLocalTrack::new(kind, room));

Check warning on line 251 in packages/media_core/src/endpoint/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal.rs#L251

Added line #L251 was not covered by tests
Comment on lines +248 to +251
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method on_transport_local_track creates a new local track but is not covered by unit tests, particularly the track creation logic.

It's important to ensure that track creation is handled correctly. Shall I help by adding some tests for this functionality?

self.local_tracks_id.insert(track, index);
}
let index = self.local_tracks_id.get1(&track)?;
Expand Down Expand Up @@ -333,6 +358,22 @@
local_track::Output::Event(event) => Some(InternalOutput::Event(EndpointEvent::LocalMediaTrack(id, event))),
local_track::Output::Cluster(room, control) => Some(InternalOutput::Cluster(room, ClusterEndpointControl::LocalTrack(id, control))),
local_track::Output::RpcRes(req_id, res) => Some(InternalOutput::RpcRes(req_id, EndpointRes::LocalTrack(id, res))),
local_track::Output::DesiredBitrate(bitrate) => Some(InternalOutput::Event(EndpointEvent::BweConfig {
current: bitrate,
desired: bitrate + 100_000.max(bitrate * 1 / 5),
Fixed Show fixed Hide fixed

Check warning

Code scanning / clippy

this operation has no effect Warning

this operation has no effect
})),
local_track::Output::Started(kind, priority) => {
if kind.is_video() {
self.bitrate_allocator.set_video_track(id, priority);
}
None

Check warning on line 369 in packages/media_core/src/endpoint/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal.rs#L361-L369

Added lines #L361 - L369 were not covered by tests
}
local_track::Output::Stopped(kind) => {
if kind.is_video() {
self.bitrate_allocator.del_video_track(id);
}
None

Check warning on line 375 in packages/media_core/src/endpoint/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal.rs#L371-L375

Added lines #L371 - L375 were not covered by tests
}
Comment on lines +361 to +376
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method convert_local_track_output handles various track events. However, the calculation in line 363 has no effect, which might be an oversight.

- desired: bitrate + 100_000.max(bitrate * 1 / 5),
+ desired: bitrate + 100_000.max(bitrate / 5),

This change ensures that the desired bitrate calculation is meaningful by correcting the division operation.


Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
local_track::Output::DesiredBitrate(bitrate) => Some(InternalOutput::Event(EndpointEvent::BweConfig {
current: bitrate,
desired: bitrate + 100_000.max(bitrate * 1 / 5),
})),
local_track::Output::Started(kind, priority) => {
if kind.is_video() {
self.bitrate_allocator.set_video_track(id, priority);
}
None
}
local_track::Output::Stopped(kind) => {
if kind.is_video() {
self.bitrate_allocator.del_video_track(id);
}
None
}
local_track::Output::DesiredBitrate(bitrate) => Some(InternalOutput::Event(EndpointEvent::BweConfig {
current: bitrate,
desired: bitrate + 100_000.max(bitrate / 5),
})),
local_track::Output::Started(kind, priority) => {
if kind.is_video() {
self.bitrate_allocator.set_video_track(id, priority);
}
None
}
local_track::Output::Stopped(kind) => {
if kind.is_video() {
self.bitrate_allocator.del_video_track(id);
}
None
}

}
}
}
Expand Down
90 changes: 90 additions & 0 deletions packages/media_core/src/endpoint/internal/bitrate_allocator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use derivative::Derivative;
use std::collections::VecDeque;

use media_server_protocol::endpoint::TrackPriority;

use crate::transport::LocalTrackId;

const DEFAULT_BITRATE_BPS: u64 = 800_000;

#[derive(Debug, PartialEq, Eq)]
pub enum Output {
SetTrackBitrate(LocalTrackId, u64),
}

#[derive(Derivative)]
#[derivative(Default)]
pub struct BitrateAllocator {
changed: bool,
#[derivative(Default(value = "DEFAULT_BITRATE_BPS"))]
egress_bitrate: u64,
tracks: smallmap::Map<LocalTrackId, TrackPriority>,
queue: VecDeque<Output>,
}

impl BitrateAllocator {
pub fn on_tick(&mut self) {
self.process();
}

pub fn set_egress_bitrate(&mut self, bitrate: u64) {
self.egress_bitrate = bitrate;
self.changed = true;
}

Check warning on line 33 in packages/media_core/src/endpoint/internal/bitrate_allocator.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/bitrate_allocator.rs#L30-L33

Added lines #L30 - L33 were not covered by tests

pub fn set_video_track(&mut self, track: LocalTrackId, priority: TrackPriority) {
self.tracks.insert(track, priority);
self.changed = true;
}

pub fn del_video_track(&mut self, track: LocalTrackId) {
self.tracks.remove(&track);
self.changed = true;
}

Check warning on line 43 in packages/media_core/src/endpoint/internal/bitrate_allocator.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/bitrate_allocator.rs#L40-L43

Added lines #L40 - L43 were not covered by tests

pub fn pop_output(&mut self) -> Option<Output> {
self.queue.pop_front()
}

fn process(&mut self) {
if !self.changed {
return;

Check warning on line 51 in packages/media_core/src/endpoint/internal/bitrate_allocator.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/bitrate_allocator.rs#L51

Added line #L51 was not covered by tests
}
self.changed = false;
let mut sum = TrackPriority(0);
for (_track, priority) in self.tracks.iter() {
sum = sum + *priority;

Check warning

Code scanning / clippy

manual implementation of an assign operation Warning

manual implementation of an assign operation
}

if *(sum.as_ref()) != 0 {
for (track, priority) in self.tracks.iter() {
self.queue.push_back(Output::SetTrackBitrate(*track, (self.egress_bitrate * priority.0 as u64) / sum.0 as u64));
}
}

Check warning on line 63 in packages/media_core/src/endpoint/internal/bitrate_allocator.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/bitrate_allocator.rs#L63

Added line #L63 was not covered by tests
}
}

#[cfg(test)]
mod test {
use super::{BitrateAllocator, Output, DEFAULT_BITRATE_BPS};

#[test]
fn single_source() {
let mut allocator = BitrateAllocator::default();
allocator.set_video_track(0.into(), 1.into());

allocator.on_tick();
assert_eq!(allocator.pop_output(), Some(Output::SetTrackBitrate(0.into(), DEFAULT_BITRATE_BPS)));
}

#[test]
fn multi_source() {
let mut allocator = BitrateAllocator::default();
allocator.set_video_track(0.into(), 1.into());
allocator.set_video_track(1.into(), 3.into());

allocator.on_tick();
assert_eq!(allocator.pop_output(), Some(Output::SetTrackBitrate(0.into(), DEFAULT_BITRATE_BPS * 1 / 4)));
assert_eq!(allocator.pop_output(), Some(Output::SetTrackBitrate(1.into(), DEFAULT_BITRATE_BPS * 3 / 4)));
}
}
Loading
Loading