Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: channel pub-sub feature and tests. cluster integration test #262

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ For a deep dive into the technical aspects of network architecture, please refer

## Project Status: Refactoring

We are actively refactoring entiry media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance. If you are looking for older version, please check in [legacy branch](https://github.com/8xFF/atm0s-media-server/tree/legacy).
We are actively refactoring entire media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance. If you are looking for an older version, please check out the [legacy branch](https://github.com/8xFF/atm0s-media-server/tree/legacy).

## Features

Expand Down
119 changes: 103 additions & 16 deletions packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//!

use derive_more::{AsRef, Display, From};
use sans_io_runtime::{Task, TaskGroup};
use sans_io_runtime::TaskGroup;
use std::{
collections::HashMap,
fmt::Debug,
Expand All @@ -23,6 +23,7 @@ use crate::transport::{LocalTrackId, RemoteTrackId};

use self::room::ClusterRoom;

mod id_generator;
mod room;

#[derive(Clone, Copy, From, AsRef, PartialEq, Eq, Debug, Display, Hash)]
Expand All @@ -46,14 +47,14 @@ pub enum ClusterRemoteTrackControl {
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClusterRemoteTrackEvent {
RequestKeyFrame,
LimitBitrate { min: u32, max: u32 },
LimitBitrate { min: u64, max: u64 },
}

#[derive(Debug, Clone)]
pub enum ClusterLocalTrackControl {
Subscribe(PeerId, TrackName),
RequestKeyFrame,
DesiredBitrate(u32),
DesiredBitrate(u64),
Unsubscribe,
}

Expand Down Expand Up @@ -90,6 +91,7 @@ pub enum Input<Owner> {
Endpoint(Owner, ClusterRoomHash, ClusterEndpointControl),
}

#[derive(Debug, PartialEq, Eq)]
pub enum Output<Owner> {
Sdn(ClusterRoomHash, FeaturesControl),
Endpoint(Vec<Owner>, ClusterEndpointEvent),
Expand All @@ -98,7 +100,7 @@ pub enum Output<Owner> {

pub struct MediaCluster<Owner: Debug + Copy + Clone + Hash + Eq> {
rooms_map: HashMap<ClusterRoomHash, usize>,
rooms: TaskGroup<room::Input<Owner>, Output<Owner>, ClusterRoom<Owner>, 128>,
rooms: TaskGroup<room::Input<Owner>, room::Output<Owner>, ClusterRoom<Owner>, 128>,
}

impl<Owner: Debug + Copy + Hash + Eq + Clone> Default for MediaCluster<Owner> {
Expand All @@ -112,41 +114,126 @@ impl<Owner: Debug + Copy + Hash + Eq + Clone> Default for MediaCluster<Owner> {

impl<Owner: Debug + Hash + Copy + Clone + Debug + Eq> MediaCluster<Owner> {
pub fn on_tick(&mut self, now: Instant) -> Option<Output<Owner>> {
let (_index, out) = self.rooms.on_tick(now)?;
Some(out)
let (index, out) = self.rooms.on_tick(now)?;
Some(self.process_room_output(index, out))
}

pub fn on_sdn_event(&mut self, now: Instant, room: ClusterRoomHash, event: FeaturesEvent) -> Option<Output<Owner>> {
let index = self.rooms_map.get(&room)?;
self.rooms.on_event(now, *index, room::Input::Sdn(event))
let out = self.rooms.on_event(now, *index, room::Input::Sdn(event))?;
Some(self.process_room_output(*index, out))
}

pub fn on_endpoint_control(&mut self, now: Instant, owner: Owner, room_hash: ClusterRoomHash, control: ClusterEndpointControl) -> Option<Output<Owner>> {
if let Some(index) = self.rooms_map.get(&room_hash) {
self.rooms.on_event(now, *index, room::Input::Endpoint(owner, control))
let out = self.rooms.on_event(now, *index, room::Input::Endpoint(owner, control))?;
Some(self.process_room_output(*index, out))
} else {
log::info!("[MediaCluster] create room {}", room_hash);
let mut room = ClusterRoom::new(room_hash);
let out = room.on_event(now, room::Input::Endpoint(owner, control));
let index = self.rooms.add_task(room);
let index = self.rooms.add_task(ClusterRoom::new(room_hash));
self.rooms_map.insert(room_hash, index);
out
let out = self.rooms.on_event(now, index, room::Input::Endpoint(owner, control))?;
Some(self.process_room_output(index, out))
}
}

pub fn pop_output(&mut self, now: Instant) -> Option<Output<Owner>> {
let (_index, out) = self.rooms.pop_output(now)?;
Some(out)
let (index, out) = self.rooms.pop_output(now)?;
Some(self.process_room_output(index, out))
}

pub fn shutdown<'a>(&mut self, now: Instant) -> Option<Output<Owner>> {
let (_index, out) = self.rooms.shutdown(now)?;
Some(out)
let (index, out) = self.rooms.shutdown(now)?;
Some(self.process_room_output(index, out))
}

fn process_room_output(&mut self, index: usize, out: room::Output<Owner>) -> Output<Owner> {
match out {
room::Output::Sdn(userdata, control) => Output::Sdn(userdata, control),
room::Output::Endpoint(owners, event) => Output::Endpoint(owners, event),
room::Output::Destroy => {
self.rooms.remove_task(index);
Output::Continue
}
}
}
}

#[cfg(test)]
mod tests {
use std::time::Instant;

use atm0s_sdn::features::{
dht_kv::{self, MapControl, MapEvent},
FeaturesControl, FeaturesEvent,
};
use media_server_protocol::endpoint::{PeerId, PeerInfo, PeerMeta, RoomInfoPublish, RoomInfoSubscribe};

use crate::cluster::{id_generator, ClusterEndpointEvent};

use super::{ClusterEndpointControl, ClusterRoomHash, MediaCluster, Output};

//TODO should create room when new room event arrived
//TODO should route to correct room
//TODO should remove room after all peers leaved
#[test]
fn room_manager_should_work() {
let mut cluster = MediaCluster::<u8>::default();

let owner = 1;
let room_hash = ClusterRoomHash(1);
let room_peers_map = id_generator::peers_map(room_hash);
let peer = PeerId("peer1".to_string());
let peer_key = id_generator::peers_key(&peer);
let peer_info = PeerInfo::new(peer.clone(), PeerMeta {});

// Not join room with scope (peer true, track false) should Set and Sub
let out = cluster.on_endpoint_control(
Instant::now(),
owner,
room_hash,
ClusterEndpointControl::Join(
peer.clone(),
peer_info.meta.clone(),
RoomInfoPublish { peer: true, tracks: false },
RoomInfoSubscribe { peers: true, tracks: false },
),
);
assert_eq!(
out,
Some(Output::Sdn(
room_hash,
FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Set(peer_key, peer_info.serialize())))
))
);
assert_eq!(
cluster.pop_output(Instant::now()),
Some(Output::Sdn(room_hash, FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Sub))))
);
assert_eq!(cluster.pop_output(Instant::now()), None);
assert_eq!(cluster.rooms.tasks(), 1);

// Correct forward to room
let out = cluster.on_sdn_event(
Instant::now(),
room_hash,
FeaturesEvent::DhtKv(dht_kv::Event::MapEvent(room_peers_map, MapEvent::OnSet(peer_key, 1, peer_info.serialize()))),
);
assert_eq!(out, Some(Output::Endpoint(vec![owner], ClusterEndpointEvent::PeerJoined(peer.clone(), peer_info.meta.clone()))));
assert_eq!(cluster.pop_output(Instant::now()), None);

// Now leave room should Del and Unsub
let out = cluster.on_endpoint_control(Instant::now(), owner, room_hash, ClusterEndpointControl::Leave);
assert_eq!(
out,
Some(Output::Sdn(room_hash, FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Del(peer_key)))))
);
assert_eq!(
cluster.pop_output(Instant::now()),
Some(Output::Sdn(room_hash, FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Unsub))))
);
assert_eq!(cluster.pop_output(Instant::now()), Some(Output::Continue)); //this is for destroy event
assert_eq!(cluster.pop_output(Instant::now()), None);
assert_eq!(cluster.rooms.tasks(), 0);
}
}
42 changes: 42 additions & 0 deletions packages/media_core/src/cluster/id_generator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use atm0s_sdn::features::dht_kv::{Key, Map};
use media_server_protocol::endpoint::{PeerId, TrackName};

use super::ClusterRoomHash;

pub fn peer_map(room: ClusterRoomHash, peer: &PeerId) -> Map {
let mut h = DefaultHasher::new();
room.as_ref().hash(&mut h);
peer.as_ref().hash(&mut h);
h.finish().into()
}

pub fn peers_map(room: ClusterRoomHash) -> Map {
room.0.into()
}

pub fn peers_key(peer: &PeerId) -> Key {
let mut h = DefaultHasher::new();
peer.as_ref().hash(&mut h);
h.finish().into()
}

pub fn tracks_map(room: ClusterRoomHash) -> Map {
(room.0 + 1).into()
}

pub fn tracks_key(peer: &PeerId, track: &TrackName) -> Key {
let mut h = DefaultHasher::new();
peer.as_ref().hash(&mut h);
track.as_ref().hash(&mut h);
h.finish().into()
}

pub fn gen_channel_id<T: From<u64>>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T {
let mut h = std::hash::DefaultHasher::new();
room.as_ref().hash(&mut h);
peer.as_ref().hash(&mut h);
track.as_ref().hash(&mut h);
h.finish().into()
}
55 changes: 25 additions & 30 deletions packages/media_core/src/cluster/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,32 @@
//! - Send/Receive pubsub channel
//!

use std::{
collections::VecDeque,
fmt::Debug,
hash::{Hash, Hasher},
time::Instant,
};
use std::{collections::VecDeque, fmt::Debug, hash::Hash, time::Instant};

use atm0s_sdn::features::{dht_kv, pubsub, FeaturesControl, FeaturesEvent};
use media_server_protocol::endpoint::{PeerId, TrackName};
use sans_io_runtime::{Task, TaskSwitcher};

use crate::transport::{LocalTrackId, RemoteTrackId};

use self::{channel_pub::RoomChannelPublisher, channel_sub::RoomChannelSubscribe, metadata::RoomMetadata};

use super::{ClusterEndpointControl, ClusterLocalTrackControl, ClusterRemoteTrackControl, ClusterRoomHash, Output};
use super::{ClusterEndpointControl, ClusterEndpointEvent, ClusterLocalTrackControl, ClusterRemoteTrackControl, ClusterRoomHash};

mod channel_pub;
mod channel_sub;
mod metadata;

#[derive(num_enum::TryFromPrimitive)]
#[repr(u8)]
pub enum FeedbackKind {
Bitrate = 0,
KeyFrameRequest = 1,
}

pub enum Input<Owner> {
Sdn(FeaturesEvent),
Endpoint(Owner, ClusterEndpointControl),
}

pub enum Output<Owner> {
Sdn(ClusterRoomHash, FeaturesControl),
Endpoint(Vec<Owner>, ClusterEndpointEvent),
Destroy,
}

#[derive(num_enum::TryFromPrimitive)]
#[repr(usize)]
enum TaskType {
Expand All @@ -55,6 +48,7 @@ pub struct ClusterRoom<Owner> {
subscriber: RoomChannelSubscribe<Owner>,
switcher: TaskSwitcher,
queue: VecDeque<Output<Owner>>,
destroyed: bool, //this flag for avoiding multi-time output destroy output
}

impl<Owner: Debug + Copy + Clone + Hash + Eq> Task<Input<Owner>, Output<Owner>> for ClusterRoom<Owner> {
Expand All @@ -73,8 +67,8 @@ impl<Owner: Debug + Copy + Clone + Hash + Eq> Task<Input<Owner>, Output<Owner>>
if let Some(out) = self.queue.pop_front() {
return Some(out);
}
loop {
match self.switcher.queue_current()?.try_into().ok()? {
while let Some(c) = self.switcher.queue_current() {
match c.try_into().ok()? {
TaskType::Metadata => {
if let Some(out) = self.switcher.queue_process(self.metadata.pop_output(now)) {
return Some(self.process_meta_output(out));
Expand All @@ -92,6 +86,14 @@ impl<Owner: Debug + Copy + Clone + Hash + Eq> Task<Input<Owner>, Output<Owner>>
}
}
}

if self.metadata.peers() == 0 && !self.destroyed {
log::info!("[ClusterRoom {}] leave last peer => remove room", self.room);
self.destroyed = true;
Some(Output::Destroy)
} else {
None
}
}

fn shutdown(&mut self, _now: Instant) -> Option<Output<Owner>> {
Expand All @@ -108,6 +110,7 @@ impl<Owner: Debug + Copy + Clone + Hash + Eq> ClusterRoom<Owner> {
subscriber: RoomChannelSubscribe::new(room),
switcher: TaskSwitcher::new(3),
queue: VecDeque::new(),
destroyed: false,
}
}

Expand Down Expand Up @@ -145,8 +148,8 @@ impl<Owner: Debug + Copy + Clone + Hash + Eq> ClusterRoom<Owner> {
Some(self.process_meta_output(out))
}
ClusterEndpointControl::Leave => {
let out = self.metadata.on_leave(owner)?;
Some(self.process_meta_output(out))
let out = self.metadata.on_leave(owner);
Some(self.process_meta_output(out?))
}
ClusterEndpointControl::SubscribePeer(target) => {
let out = self.metadata.on_subscribe_peer(owner, target)?;
Expand Down Expand Up @@ -195,11 +198,11 @@ impl<Owner: Debug + Clone + Copy + Hash + Eq> ClusterRoom<Owner> {
}
}

fn control_local_track(&mut self, _now: Instant, owner: Owner, track_id: LocalTrackId, control: ClusterLocalTrackControl) -> Option<Output<Owner>> {
fn control_local_track(&mut self, now: Instant, owner: Owner, track_id: LocalTrackId, control: ClusterLocalTrackControl) -> Option<Output<Owner>> {
let out = match control {
ClusterLocalTrackControl::Subscribe(target_peer, target_track) => self.subscriber.on_track_subscribe(owner, track_id, target_peer, target_track),
ClusterLocalTrackControl::RequestKeyFrame => self.subscriber.on_track_request_key(owner, track_id),
ClusterLocalTrackControl::DesiredBitrate(bitrate) => self.subscriber.on_track_desired_bitrate(owner, track_id, bitrate),
ClusterLocalTrackControl::DesiredBitrate(bitrate) => self.subscriber.on_track_desired_bitrate(now, owner, track_id, bitrate),
ClusterLocalTrackControl::Unsubscribe => self.subscriber.on_track_unsubscribe(owner, track_id),
}?;
Some(self.process_subscriber_output(out))
Expand Down Expand Up @@ -230,14 +233,6 @@ impl<Owner: Debug + Clone + Copy + Hash + Eq> ClusterRoom<Owner> {
}
}

pub fn gen_channel_id<T: From<u64>>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T {
let mut h = std::hash::DefaultHasher::new();
room.as_ref().hash(&mut h);
peer.as_ref().hash(&mut h);
track.as_ref().hash(&mut h);
h.finish().into()
}

#[cfg(test)]
mod tests {
//TODO join room should set key-value and SUB to maps
Expand All @@ -247,7 +242,7 @@ mod tests {
//TODO track feedback should fire event to endpoint
//TODO track stopped should DEL key-value and pubsub STOP
//TODO subscribe track should SUB channel
//TODO fedback track should FEEDBACK channel
//TODO feddback track should FEEDBACK channel
//TODO channel data should fire event to endpoint
//TODO unsubscribe track should UNSUB channel
}
Loading
Loading