Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed: server crash because wrong ordered of remote stream destroy messages #380

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 67 additions & 3 deletions packages/media_core/src/endpoint/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,21 +474,30 @@ impl EndpointInternal {
}
}

impl Drop for EndpointInternal {
fn drop(&mut self) {
assert_eq!(self.queue.len(), 0, "endpoint internal queue should empty on drop");
}
}

#[cfg(test)]
mod tests {
use std::{
net::{IpAddr, Ipv4Addr},
time::Instant,
};

use media_server_protocol::endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe};
use media_server_protocol::protobuf::cluster_connector::peer_event;
use media_server_protocol::{
endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta},
protobuf::shared::Kind,
};
use sans_io_runtime::TaskSwitcherChild;

use crate::{
cluster::{ClusterEndpointControl, ClusterRoomHash},
cluster::{ClusterEndpointControl, ClusterRemoteTrackControl, ClusterRoomHash},
endpoint::{internal::InternalOutput, EndpointCfg, EndpointReq, EndpointRes},
transport::{TransportEvent, TransportState},
transport::{RemoteTrackEvent, TransportEvent, TransportState},
};

use super::EndpointInternal;
Expand Down Expand Up @@ -546,6 +555,60 @@ mod tests {
);
assert_eq!(internal.pop_output(now), None);

//now start a remote track
let remote_track_id = 0.into();
let remote_track_meta = TrackMeta::default_audio();
internal.on_transport_event(
now,
TransportEvent::RemoteTrack(
remote_track_id,
RemoteTrackEvent::Started {
name: "audio_main".into(),
priority: 100.into(),
meta: remote_track_meta.clone(),
},
),
);
assert_eq!(
internal.pop_output(now),
Some(InternalOutput::Cluster(
room_hash,
ClusterEndpointControl::RemoteTrack(remote_track_id, ClusterRemoteTrackControl::Started("audio_main".into(), remote_track_meta.clone()))
))
);
assert_eq!(
internal.pop_output(now),
Some(InternalOutput::PeerEvent(
now,
peer_event::Event::RemoteTrackStarted(peer_event::RemoteTrackStarted {
track: "audio_main".to_string(),
kind: Kind::from(remote_track_meta.kind) as i32,
}),
))
);
assert_eq!(internal.pop_output(now), None);

//now stop remote track
internal.on_transport_event(now, TransportEvent::RemoteTrack(remote_track_id, RemoteTrackEvent::Ended));
assert_eq!(
internal.pop_output(now),
Some(InternalOutput::Cluster(
room_hash,
ClusterEndpointControl::RemoteTrack(remote_track_id, ClusterRemoteTrackControl::Ended("audio_main".into(), remote_track_meta.clone()))
))
);
assert_eq!(
internal.pop_output(now),
Some(InternalOutput::PeerEvent(
now,
peer_event::Event::RemoteTrackEnded(peer_event::RemoteTrackEnded {
track: "audio_main".to_string(),
kind: Kind::from(remote_track_meta.kind) as i32,
}),
))
);
assert_eq!(internal.pop_output(now), None);

//now leave room should success
internal.on_transport_rpc(now, 1.into(), EndpointReq::LeaveRoom);
assert_eq!(internal.pop_output(now), Some(InternalOutput::RpcRes(1.into(), EndpointRes::LeaveRoom(Ok(())))));
Expand Down Expand Up @@ -671,6 +734,7 @@ mod tests {
//TODO single local track, join leave room
//TODO multi local tracks, join leave room
//TODO single remote track, join leave room

//TODO multi remote tracks, join leave room
//TODO both local and remote tracks, join leave room
//TODO test local and remote stopped must clear resource
Expand Down
2 changes: 1 addition & 1 deletion packages/media_core/src/endpoint/internal/local_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@

impl Drop for EndpointLocalTrack {
fn drop(&mut self) {
assert_eq!(self.queue.len(), 0);
assert_eq!(self.queue.len(), 0, "local track queue should empty on drop");

Check warning on line 310 in packages/media_core/src/endpoint/internal/local_track.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/endpoint/internal/local_track.rs#L310

Added line #L310 was not covered by tests
}
}

Expand Down
74 changes: 70 additions & 4 deletions packages/media_core/src/endpoint/internal/remote_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub enum Input {
BitrateAllocation(IngressAction),
}

#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum Output {
Event(EndpointRemoteTrackEvent),
Cluster(ClusterRoomHash, ClusterRemoteTrackControl),
Expand Down Expand Up @@ -170,7 +170,6 @@ impl EndpointRemoteTrack {
let room = return_if_none!(self.room.as_ref());
log::info!("[EndpointRemoteTrack] stopped with name {name} in room {room}");
self.queue.push_back(Output::Cluster(*room, ClusterRemoteTrackControl::Ended(name.clone(), self.meta.clone())));
self.queue.push_back(Output::Stopped(self.meta.kind));
if self.record {
self.queue.push_back(Output::RecordEvent(now, SessionRecordEvent::TrackStopped(self.id)));
}
Expand All @@ -181,6 +180,8 @@ impl EndpointRemoteTrack {
kind: Kind::from(self.meta.kind) as i32,
}),
));
// We must send Stopped at last, if not we missed some event
self.queue.push_back(Output::Stopped(self.meta.kind));
}
}
}
Expand Down Expand Up @@ -250,13 +251,78 @@ impl TaskSwitcherChild<Output> for EndpointRemoteTrack {

impl Drop for EndpointRemoteTrack {
fn drop(&mut self) {
assert_eq!(self.queue.len(), 0);
assert_eq!(self.queue.len(), 0, "remote track queue should empty on drop");
}
}

#[cfg(test)]
mod tests {
//TODO start in room
use std::time::{Duration, Instant};

use media_server_protocol::{
endpoint::{TrackMeta, TrackName},
protobuf::{cluster_connector::peer_event, shared::Kind},
};
use sans_io_runtime::{Task, TaskSwitcherChild};

use crate::{cluster::ClusterRemoteTrackControl, transport::RemoteTrackEvent};

use super::{EndpointRemoteTrack, Input, Output};

#[test]
fn start_in_room() {
let room = 0.into();
let track_name = TrackName("audio_main".to_string());
let track_id = 1.into();
let track_priority = 2.into();
let meta = TrackMeta::default_audio();
let now = Instant::now();
let mut track = EndpointRemoteTrack::new(Some(room), track_id, meta.clone(), false);
assert_eq!(track.pop_output(now), None);

track.on_event(
now,
Input::Event(RemoteTrackEvent::Started {
name: track_name.0.clone(),
priority: track_priority,
meta: meta.clone(),
}),
);

assert_eq!(track.pop_output(now), Some(Output::Cluster(room, ClusterRemoteTrackControl::Started(track_name.clone(), meta.clone()))));
assert_eq!(track.pop_output(now), Some(Output::Started(meta.kind, track_priority)));
assert_eq!(
track.pop_output(now),
Some(Output::PeerEvent(
now,
peer_event::Event::RemoteTrackStarted(peer_event::RemoteTrackStarted {
track: track_name.0.clone(),
kind: Kind::from(meta.kind) as i32,
}),
))
);
assert_eq!(track.pop_output(now), None);

//now leave room
let now = now + Duration::from_secs(1);
track.on_event(now, Input::Event(RemoteTrackEvent::Ended));

assert_eq!(track.pop_output(now), Some(Output::Cluster(room, ClusterRemoteTrackControl::Ended(track_name.clone(), meta.clone()))));
assert_eq!(
track.pop_output(now),
Some(Output::PeerEvent(
now,
peer_event::Event::RemoteTrackEnded(peer_event::RemoteTrackEnded {
track: track_name.0.clone(),
kind: Kind::from(meta.kind) as i32,
}),
))
);
//we need Output::Stopped at last
assert_eq!(track.pop_output(now), Some(Output::Stopped(meta.kind)));
assert_eq!(track.pop_output(now), None);
}

//TODO start not in room
//TODO stop in room
//TODO stop not in room
Expand Down
Loading