Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add cluster metadata publish and subscribe options: peer and track info #260

Merged
merged 5 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{

use atm0s_sdn::features::{FeaturesControl, FeaturesEvent};
use media_server_protocol::{
endpoint::{PeerId, RoomId, TrackMeta, TrackName},
endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta, TrackName},
media::MediaPacket,
};

Expand Down Expand Up @@ -43,7 +43,7 @@ pub enum ClusterRemoteTrackControl {
Ended,
}

#[derive(Clone)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClusterRemoteTrackEvent {
RequestKeyFrame,
LimitBitrate { min: u32, max: u32 },
Expand All @@ -57,39 +57,28 @@ pub enum ClusterLocalTrackControl {
Unsubscribe,
}

#[derive(Debug, Clone)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClusterLocalTrackEvent {
Started,
SourceChanged,
Media(MediaPacket),
Ended,
}

#[derive(Debug)]
pub enum ClusterRoomInfoPublishLevel {
Full,
TrackOnly,
}

#[derive(Debug)]
pub enum ClusterRoomInfoSubscribeLevel {
Full,
TrackOnly,
Manual,
}

#[derive(Debug)]
pub enum ClusterEndpointControl {
Join(PeerId, ClusterRoomInfoPublishLevel, ClusterRoomInfoSubscribeLevel),
Join(PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe),
Leave,
SubscribePeer(PeerId),
UnsubscribePeer(PeerId),
RemoteTrack(RemoteTrackId, ClusterRemoteTrackControl),
LocalTrack(LocalTrackId, ClusterLocalTrackControl),
}

#[derive(Clone)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClusterEndpointEvent {
PeerJoined(PeerId, PeerMeta),
PeerLeaved(PeerId),
TrackStarted(PeerId, TrackName, TrackMeta),
TrackStopped(PeerId, TrackName),
RemoteTrack(RemoteTrackId, ClusterRemoteTrackEvent),
Expand Down
6 changes: 3 additions & 3 deletions packages/media_core/src/cluster/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@

fn on_endpoint_control(&mut self, now: Instant, owner: Owner, control: ClusterEndpointControl) -> Option<Output<Owner>> {
match control {
ClusterEndpointControl::Join(peer, publish, subscribe) => {
let out = self.metadata.on_join(owner, peer, publish, subscribe)?;
ClusterEndpointControl::Join(peer, meta, publish, subscribe) => {
let out = self.metadata.on_join(owner, peer, meta, publish, subscribe)?;

Check warning on line 144 in packages/media_core/src/cluster/room.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/cluster/room.rs#L143-L144

Added lines #L143 - L144 were not covered by tests
Some(self.process_meta_output(out))
}
ClusterEndpointControl::Leave => {
Expand Down Expand Up @@ -230,7 +230,7 @@
}
}

pub fn track_key<T: From<u64>>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T {
pub fn gen_channel_id<T: From<u64>>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T {

Check warning on line 233 in packages/media_core/src/cluster/room.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/cluster/room.rs#L233

Added line #L233 was not covered by tests
let mut h = std::hash::DefaultHasher::new();
room.as_ref().hash(&mut h);
peer.as_ref().hash(&mut h);
Expand Down
7 changes: 2 additions & 5 deletions packages/media_core/src/cluster/room/channel_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,13 @@
let (owner, track_id) = self.tracks_source.get(&channel)?;
match fb_kind {
FeedbackKind::Bitrate => todo!(),
FeedbackKind::KeyFrameRequest => Some(Output::Endpoint(
vec![owner.clone()],
ClusterEndpointEvent::RemoteTrack(*track_id, ClusterRemoteTrackEvent::RequestKeyFrame),
)),
FeedbackKind::KeyFrameRequest => Some(Output::Endpoint(vec![*owner], ClusterEndpointEvent::RemoteTrack(*track_id, ClusterRemoteTrackEvent::RequestKeyFrame))),

Check warning on line 49 in packages/media_core/src/cluster/room/channel_pub.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/cluster/room/channel_pub.rs#L49

Added line #L49 was not covered by tests
}
}

pub fn on_track_publish(&mut self, owner: Owner, track: RemoteTrackId, peer: PeerId, name: TrackName) -> Option<Output<Owner>> {
log::info!("[ClusterRoom {}] peer ({peer} started track {name})", self.room);
let channel_id = super::track_key(self.room, &peer, &name);
let channel_id = super::gen_channel_id(self.room, &peer, &name);

Check warning on line 55 in packages/media_core/src/cluster/room/channel_pub.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/cluster/room/channel_pub.rs#L55

Added line #L55 was not covered by tests
self.tracks.insert((owner, track), (peer.clone(), name.clone(), channel_id));
self.tracks_source.insert(channel_id, (owner, track));

Expand Down
10 changes: 4 additions & 6 deletions packages/media_core/src/cluster/room/channel_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
log::info!("[ClusterRoom {}] cluster: channel {channel} source changed => fire event to {:?}", self.room, subscribers);
for (owner, track) in subscribers {
self.queue
.push_back(Output::Endpoint(vec![owner.clone()], ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::SourceChanged)))
.push_back(Output::Endpoint(vec![*owner], ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::SourceChanged)))

Check warning on line 52 in packages/media_core/src/cluster/room/channel_sub.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/cluster/room/channel_sub.rs#L52

Added line #L52 was not covered by tests
}
self.queue.pop_front()
}
Expand All @@ -59,16 +59,14 @@
let subscribers = self.subscribers.get(&channel)?;
log::trace!("[ClusterRoom {}] on channel media payload {} seq {} to {} subscribers", self.room, pkt.pt, pkt.seq, subscribers.len());
for (owner, track) in subscribers {
self.queue.push_back(Output::Endpoint(
vec![owner.clone()],
ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::Media(pkt.clone())),
))
self.queue
.push_back(Output::Endpoint(vec![*owner], ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::Media(pkt.clone()))))

Check warning on line 63 in packages/media_core/src/cluster/room/channel_sub.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/cluster/room/channel_sub.rs#L62-L63

Added lines #L62 - L63 were not covered by tests
}
self.queue.pop_front()
}

pub fn on_track_subscribe(&mut self, owner: Owner, track: LocalTrackId, target_peer: PeerId, target_track: TrackName) -> Option<Output<Owner>> {
let channel_id: ChannelId = super::track_key(self.room, &target_peer, &target_track);
let channel_id: ChannelId = super::gen_channel_id(self.room, &target_peer, &target_track);

Check warning on line 69 in packages/media_core/src/cluster/room/channel_sub.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_core/src/cluster/room/channel_sub.rs#L69

Added line #L69 was not covered by tests
log::info!(
"[ClusterRoom {}] owner {:?} track {track} subscribe peer {target_peer} track {target_track}), channel: {channel_id}",
self.room,
Expand Down
Loading
Loading