Skip to content

Commit

Permalink
added metrics for tracking live objects
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Oct 27, 2024
1 parent 2d79689 commit 2da7793
Show file tree
Hide file tree
Showing 23 changed files with 177 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions bin/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use utils::EmbeddedFilesEndpoint;

mod api_console;
mod api_media;
mod api_metrics;
mod api_token;
mod utils;

Expand Down Expand Up @@ -70,6 +71,10 @@ pub async fn run_console_http_server(
) -> Result<(), Box<dyn std::error::Error>> {
use poem::middleware::Tracing;

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/");
let user_ui = user_service.swagger_ui();
let user_spec = user_service.spec();
Expand All @@ -91,6 +96,10 @@ pub async fn run_console_http_server(

let route = Route::new()
.nest("/", console_panel)
//metrics
.nest("/api/metrics/", metrics_service)
.nest("/api/metrics/ui", metrics_ui)
.at("/api/metrics/spec", poem::endpoint::make_sync(move |_| metrics_spec.clone()))
//user
.nest("/api/user/", user_service.data(ctx.clone()))
.nest("/api/user/ui", user_ui)
Expand Down Expand Up @@ -121,6 +130,10 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync
let token_ui = token_service.swagger_ui();
let token_spec = token_service.spec();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

let webrtc_service: OpenApiService<_, ()> = OpenApiService::new(
api_media::WebrtcApis::<ES>::new(sender.clone(), edge_secure.clone()),
"Media Webrtc Gateway APIs",
Expand Down Expand Up @@ -164,18 +177,27 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync

let route = Route::new()
.nest("/samples", samples)
//token
.nest("/token/", token_service.data(api_token::TokenServerCtx { secure: gateway_secure }))
.nest("/token/ui", token_ui)
.at("/token/spec", poem::endpoint::make_sync(move |_| token_spec.clone()))
//metrics
.nest("/api/metrics/", metrics_service)
.nest("/api/metrics/ui", metrics_ui)
.at("/api/metrics/spec", poem::endpoint::make_sync(move |_| metrics_spec.clone()))
//webrtc
.nest("/webrtc/", webrtc_service)
.nest("/webrtc/ui", webrtc_ui)
.at("/webrtc/spec", poem::endpoint::make_sync(move |_| webrtc_spec.clone()))
//whip
.nest("/whip/", whip_service)
.nest("/whip/ui", whip_ui)
.at("/whip/spec", poem::endpoint::make_sync(move |_| whip_spec.clone()))
//whep
.nest("/whep/", whep_service)
.nest("/whep/ui", whep_ui)
.at("/whep/spec", poem::endpoint::make_sync(move |_| whep_spec.clone()))
//rtpengine
.nest("/rtpengine/", rtpengine_service)
.nest("/rtpengine/ui", rtpengine_ui)
.at("/rtpengine/spec", poem::endpoint::make_sync(move |_| rtpengine_spec.clone()))
Expand All @@ -194,6 +216,10 @@ pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync,
) -> Result<(), Box<dyn std::error::Error>> {
let mut route = Route::new();

let metrics_service: OpenApiService<_, ()> = OpenApiService::new(api_metrics::Apis, "Console Metrics APIs", env!("CARGO_PKG_VERSION")).server("/api/metrics/");
let metrics_ui = metrics_service.swagger_ui();
let metrics_spec = metrics_service.spec();

if let Some(gateway_secure) = gateway_secure {
let token_service: OpenApiService<_, ()> = OpenApiService::new(api_token::TokenApis::<GS>::new(), "App APIs", env!("CARGO_PKG_VERSION")).server("/token/");
let token_ui = token_service.swagger_ui();
Expand Down Expand Up @@ -247,15 +273,23 @@ pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync,

let route = route
.nest("/samples", samples)
//metrics
.nest("/api/metrics/", metrics_service)
.nest("/api/metrics/ui", metrics_ui)
.at("/api/metrics/spec", poem::endpoint::make_sync(move |_| metrics_spec.clone()))
//webrtc
.nest("/webrtc/", webrtc_service)
.nest("/webrtc/ui", webrtc_ui)
.at("/webrtc/spec", poem::endpoint::make_sync(move |_| webrtc_spec.clone()))
//whip
.nest("/whip/", whip_service)
.nest("/whip/ui", whip_ui)
.at("/whip/spec", poem::endpoint::make_sync(move |_| whip_spec.clone()))
//whep
.nest("/whep/", whep_service)
.nest("/whep/ui", whep_ui)
.at("/whep/spec", poem::endpoint::make_sync(move |_| whep_spec.clone()))
//rtpengine
.nest("/rtpengine/", rtpengine_service)
.nest("/rtpengine/ui", rtpengine_ui)
.at("/rtpengine/spec", poem::endpoint::make_sync(move |_| rtpengine_spec.clone()))
Expand Down
14 changes: 14 additions & 0 deletions bin/src/http/api_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use std::collections::BTreeMap;

use media_server_utils::get_all_counts;
use poem_openapi::{payload::Json, OpenApi};

pub struct Apis;

#[OpenApi]
impl Apis {
#[oai(path = "/counts", method = "get")]
async fn get_counts(&self) -> Json<BTreeMap<String, usize>> {
Json(get_all_counts().into_iter().map(|(k, v)| (k.to_string(), v)).collect())
}
}
10 changes: 6 additions & 4 deletions bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,14 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
let secure = Arc::new(MediaEdgeSecureJwt::from(node.secret.as_bytes()));
let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024);
if let Some(http_port) = http_port {
let app_storage = Arc::new(MultiTenancyStorage::new(&node.secret, None));
let secure2 = args.enable_token_api.then(|| Arc::new(MediaGatewaySecureJwt::new(node.secret.as_bytes(), app_storage)));
let secure_gateway = args.enable_token_api.then(|| {
let app_storage = Arc::new(MultiTenancyStorage::new(&node.secret, None));
Arc::new(MediaGatewaySecureJwt::new(node.secret.as_bytes(), app_storage))
});
let req_tx = req_tx.clone();
let secure = secure.clone();
let secure_edge = secure.clone();
tokio::spawn(async move {
if let Err(e) = run_media_http_server(http_port, req_tx, secure, secure2).await {
if let Err(e) = run_media_http_server(http_port, req_tx, secure_edge, secure_gateway).await {
log::error!("HTTP Error: {}", e);
}
});
Expand Down
3 changes: 3 additions & 0 deletions packages/media_core/src/cluster/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{fmt::Debug, hash::Hash, time::Instant};

use atm0s_sdn::features::{dht_kv, FeaturesControl, FeaturesEvent};
use media_server_protocol::message_channel::MessageChannelPacket;
use media_server_utils::Count;
use message_channel::RoomMessageChannel;
use sans_io_runtime::{return_if_none, Task, TaskSwitcher, TaskSwitcherBranch, TaskSwitcherChild};

Expand Down Expand Up @@ -64,6 +65,7 @@ enum TaskType {
}

pub struct ClusterRoom<Endpoint: Debug + Copy + Clone + Hash + Eq> {
_c: Count<Self>,
room: ClusterRoomHash,
metadata: TaskSwitcherBranch<RoomMetadata<Endpoint>, metadata::Output<Endpoint>>,
media_track: TaskSwitcherBranch<MediaTrack<Endpoint>, media_track::Output<Endpoint>>,
Expand Down Expand Up @@ -153,6 +155,7 @@ impl<Endpoint: Debug + Copy + Clone + Hash + Eq> ClusterRoom<Endpoint> {
pub fn new(room: ClusterRoomHash) -> Self {
let mixer_channel_id = id_generator::gen_mixer_auto_channel_id(room);
Self {
_c: Default::default(),
room,
metadata: TaskSwitcherBranch::new(RoomMetadata::new(room), TaskType::Metadata),
media_track: TaskSwitcherBranch::new(MediaTrack::new(room), TaskType::MediaTrack),
Expand Down
3 changes: 3 additions & 0 deletions packages/media_core/src/cluster/room/audio_mixer/manual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use media_server_protocol::{
media::{MediaMeta, MediaPacket},
transport::LocalTrackId,
};
use media_server_utils::Count;
use sans_io_runtime::{collections::DynamicDeque, Task, TaskSwitcherChild};

use crate::cluster::{id_generator, ClusterAudioMixerEvent, ClusterEndpointEvent, ClusterLocalTrackEvent, ClusterRoomHash};
Expand All @@ -30,6 +31,7 @@ pub enum Input {
}

pub struct ManualMixer<Endpoint> {
_c: Count<Self>,
endpoint: Endpoint,
room: ClusterRoomHash,
outputs: Vec<LocalTrackId>,
Expand All @@ -41,6 +43,7 @@ pub struct ManualMixer<Endpoint> {
impl<Endpoint: Clone> ManualMixer<Endpoint> {
pub fn new(room: ClusterRoomHash, endpoint: Endpoint, outputs: Vec<LocalTrackId>) -> Self {
Self {
_c: Default::default(),
endpoint,
room,
mixer: audio_mixer::AudioMixer::new(outputs.len()),
Expand Down
3 changes: 3 additions & 0 deletions packages/media_core/src/cluster/room/audio_mixer/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use media_server_protocol::{
endpoint::{AudioMixerPkt, PeerHashCode, PeerId, TrackName},
media::{MediaMeta, MediaPacket},
};
use media_server_utils::Count;
use sans_io_runtime::{collections::DynamicDeque, TaskSwitcherChild};

use crate::transport::RemoteTrackId;
Expand All @@ -29,6 +30,7 @@ struct OutputSlot {
}

pub struct AudioMixerPublisher<Endpoint> {
_c: Count<Self>,
channel_id: pubsub::ChannelId,
tracks: HashMap<(Endpoint, RemoteTrackId), TrackSlot>,
mixer: audio_mixer::AudioMixer<(Endpoint, RemoteTrackId)>,
Expand All @@ -39,6 +41,7 @@ pub struct AudioMixerPublisher<Endpoint> {
impl<Endpoint: Debug + Clone + Eq + Hash> AudioMixerPublisher<Endpoint> {
pub fn new(channel_id: ChannelId) -> Self {
Self {
_c: Default::default(),
tracks: Default::default(),
channel_id,
mixer: audio_mixer::AudioMixer::new(3),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use media_server_protocol::{
media::{MediaMeta, MediaPacket},
transport::LocalTrackId,
};
use media_server_utils::Count;
use sans_io_runtime::{collections::DynamicDeque, return_if_none, TaskSwitcherChild};

use crate::cluster::{ClusterAudioMixerEvent, ClusterEndpointEvent, ClusterLocalTrackEvent};
Expand All @@ -27,6 +28,7 @@ struct OutputSlot {
}

pub struct AudioMixerSubscriber<Endpoint, const OUTPUTS: usize> {
_c: Count<Self>,
channel_id: ChannelId,
queue: DynamicDeque<Output<Endpoint>, 16>,
endpoints: IndexMap<Endpoint, EndpointSlot>,
Expand All @@ -37,6 +39,7 @@ pub struct AudioMixerSubscriber<Endpoint, const OUTPUTS: usize> {
impl<Endpoint: Debug + Hash + Eq + Clone, const OUTPUTS: usize> AudioMixerSubscriber<Endpoint, OUTPUTS> {
pub fn new(channel_id: ChannelId) -> Self {
Self {
_c: Default::default(),
channel_id,
queue: Default::default(),
endpoints: IndexMap::new(),
Expand Down
3 changes: 3 additions & 0 deletions packages/media_core/src/cluster/room/media_track/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use media_server_protocol::{
endpoint::{PeerId, TrackName},
media::MediaPacket,
};
use media_server_utils::Count;
use sans_io_runtime::{return_if_err, return_if_none, TaskSwitcherChild};

use crate::{
Expand All @@ -39,6 +40,7 @@ impl TryFrom<Feedback> for FeedbackKind {
}

pub struct RoomChannelPublisher<Endpoint> {
_c: Count<Self>,
room: ClusterRoomHash,
tracks: HashMap<(Endpoint, RemoteTrackId), (PeerId, TrackName, ChannelId)>,
tracks_source: HashMap<ChannelId, HashSet<(Endpoint, RemoteTrackId)>>, // We allow multi sources here for avoiding crash
Expand All @@ -48,6 +50,7 @@ pub struct RoomChannelPublisher<Endpoint> {
impl<Endpoint: Debug + Hash + Eq + Copy> RoomChannelPublisher<Endpoint> {
pub fn new(room: ClusterRoomHash) -> Self {
Self {
_c: Default::default(),
room,
tracks: HashMap::new(),
tracks_source: HashMap::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use media_server_protocol::{
endpoint::{PeerId, TrackName},
media::MediaPacket,
};
use media_server_utils::Count;
use sans_io_runtime::{return_if_none, TaskSwitcherChild};

use crate::{
Expand All @@ -44,6 +45,7 @@ struct ChannelContainer<Endpoint> {
}

pub struct RoomChannelSubscribe<Endpoint> {
_c: Count<Self>,
room: ClusterRoomHash,
channels: HashMap<ChannelId, ChannelContainer<Endpoint>>,
subscribers: HashMap<(Endpoint, LocalTrackId), (ChannelId, PeerId, TrackName)>,
Expand All @@ -53,6 +55,7 @@ pub struct RoomChannelSubscribe<Endpoint> {
impl<Endpoint: Hash + Eq + Copy + Debug> RoomChannelSubscribe<Endpoint> {
pub fn new(room: ClusterRoomHash) -> Self {
Self {
_c: Default::default(),
room,
channels: HashMap::new(),
subscribers: HashMap::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use atm0s_sdn::features::pubsub::{self, ChannelControl, ChannelId};
use media_server_protocol::message_channel::MessageChannelPacket;
use media_server_utils::Count;
use sans_io_runtime::{return_if_none, TaskSwitcherChild};

use crate::{
Expand All @@ -20,6 +21,7 @@ struct ChannelContainer<Endpoint> {
}

pub struct MessageChannelPublisher<Endpoint> {
_c: Count<Self>,
room: ClusterRoomHash,
channels: HashMap<ChannelId, ChannelContainer<Endpoint>>,
publishers: HashMap<Endpoint, HashSet<ChannelId>>,
Expand All @@ -29,6 +31,7 @@ pub struct MessageChannelPublisher<Endpoint> {
impl<Endpoint: Hash + Eq + Copy + Debug> MessageChannelPublisher<Endpoint> {
pub fn new(room: ClusterRoomHash) -> Self {
Self {
_c: Default::default(),
room,
queue: VecDeque::new(),
channels: HashMap::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use atm0s_sdn::features::pubsub::{self, ChannelControl, ChannelId};
use media_server_protocol::message_channel::MessageChannelPacket;
use media_server_utils::Count;
use sans_io_runtime::{return_if_none, TaskSwitcherChild};

use super::Output;
Expand All @@ -20,6 +21,7 @@ struct ChannelContainer<Endpoint> {
}

pub struct MessageChannelSubscriber<Endpoint> {
_c: Count<Self>,
room: ClusterRoomHash,
channels: HashMap<ChannelId, ChannelContainer<Endpoint>>,
subscriptions: HashMap<Endpoint, HashSet<ChannelId>>,
Expand All @@ -29,6 +31,7 @@ pub struct MessageChannelSubscriber<Endpoint> {
impl<Endpoint: Hash + Eq + Copy + Debug> MessageChannelSubscriber<Endpoint> {
pub fn new(room: ClusterRoomHash) -> Self {
Self {
_c: Default::default(),
room,
queue: VecDeque::new(),
channels: HashMap::new(),
Expand Down
4 changes: 3 additions & 1 deletion packages/media_core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use media_server_protocol::{
record::SessionRecordEvent,
transport::RpcResult,
};
use media_server_utils::Count;
use sans_io_runtime::{
backend::{BackendIncoming, BackendOutgoing},
return_if_some, Task, TaskSwitcher, TaskSwitcherBranch, TaskSwitcherChild,
Expand All @@ -25,7 +26,6 @@ use internal::EndpointInternal;
use self::internal::InternalOutput;

mod internal;
mod middleware;

pub struct EndpointSession(pub u64);

Expand Down Expand Up @@ -226,6 +226,7 @@ pub struct EndpointCfg {
}

pub struct Endpoint<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> {
_c: Count<Self>,
app: AppId,
session_id: u64,
transport: TaskSwitcherBranch<T, TransportOutput<ExtOut>>,
Expand All @@ -237,6 +238,7 @@ pub struct Endpoint<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> {
impl<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> Endpoint<T, ExtIn, ExtOut> {
pub fn new(session_id: u64, cfg: EndpointCfg, transport: T) -> Self {
Self {
_c: Default::default(),
app: cfg.app.app.clone(),
session_id,
transport: TaskSwitcherBranch::new(transport, TaskType::Transport),
Expand Down
Loading

0 comments on commit 2da7793

Please sign in to comment.