Skip to content

Commit

Permalink
added token secure
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Jul 29, 2024
1 parent 7bf7f43 commit 27b7434
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 81 deletions.
52 changes: 48 additions & 4 deletions bin/src/http/api_token.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{marker::PhantomData, sync::Arc};

use super::{utils::TokenAuthorization, Response};
use media_server_protocol::tokens::{WebrtcToken, WhepToken, WhipToken};
use media_server_protocol::tokens::{RtpEngineToken, WebrtcToken, WhepToken, WhipToken, RTPENGINE_TOKEN, WEBRTC_TOKEN, WHEP_TOKEN, WHIP_TOKEN};
use media_server_secure::MediaGatewaySecure;
use poem::{web::Data, Result};
use poem_openapi::{payload::Json, OpenApi};
Expand Down Expand Up @@ -60,6 +60,20 @@ struct WebrtcTokenRes {
token: String,
}

#[derive(poem_openapi::Object)]
struct RtpEngineTokenReq {
room: String,
peer: String,
ttl: u64,
record: Option<bool>,
extra_data: Option<String>,
}

#[derive(poem_openapi::Object)]
struct RtpEngineTokenRes {
token: String,
}

pub struct TokenApis<S: MediaGatewaySecure + Send + Sync>(PhantomData<S>);

impl<S: MediaGatewaySecure + Send + Sync> TokenApis<S> {
Expand All @@ -79,7 +93,7 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
status: true,
data: Some(WhipTokenRes {
token: ctx.secure.encode_obj(
"whip",
WHIP_TOKEN,
WhipToken {
room: body.room,
peer: body.peer,
Expand Down Expand Up @@ -109,7 +123,7 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
status: true,
data: Some(WhepTokenRes {
token: ctx.secure.encode_obj(
"whep",
WHEP_TOKEN,
WhepToken {
room: body.room,
peer: body.peer,
Expand Down Expand Up @@ -137,7 +151,7 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
status: true,
data: Some(WebrtcTokenRes {
token: ctx.secure.encode_obj(
"webrtc",
WEBRTC_TOKEN,
WebrtcToken {
room: body.room,
peer: body.peer,
Expand All @@ -157,4 +171,34 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
})
}
}

/// create rtpengine session token
#[oai(path = "/rtpengine", method = "post")]
async fn rtpengine_token(&self, Data(ctx): Data<&TokenServerCtx<S>>, body: Json<RtpEngineTokenReq>, TokenAuthorization(token): TokenAuthorization) -> Result<Json<Response<RtpEngineTokenRes>>> {
if ctx.secure.validate_app(&token.token) {
let body = body.0;
Ok(Json(Response {
status: true,
data: Some(RtpEngineTokenRes {
token: ctx.secure.encode_obj(
RTPENGINE_TOKEN,
RtpEngineToken {
room: body.room,
peer: body.peer,
record: body.record.unwrap_or(false),
extra_data: body.extra_data,
},
body.ttl,
),
}),
..Default::default()
}))
} else {
Ok(Json(Response {
status: false,
error: Some("APP_TOKEN_INVALID".to_string()),
..Default::default()
}))
}
}
}
94 changes: 47 additions & 47 deletions bin/src/ng_controller.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
use media_server_protocol::cluster::gen_cluster_session_id;
use media_server_protocol::endpoint::{ClusterConnId, PeerId, RoomId};
use media_server_protocol::tokens::{RtpEngineToken, RTPENGINE_TOKEN};
use media_server_protocol::transport::{rtpengine, RpcReq, RpcRes};
use media_server_secure::MediaEdgeSecure;
use media_server_utils::select2;
use rtpengine_ngcontrol::{NgCmdResult, NgCommand, NgRequest, NgResponse, NgTransport};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender};

use crate::rpc::Rpc;

pub struct NgControllerServer<T> {
pub struct NgControllerServer<T, S> {
transport: T,
secure: Arc<S>,
rpc_sender: Sender<Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
answer_tx: Sender<(String, RpcRes<ClusterConnId>, SocketAddr)>,
answer_rx: Receiver<(String, RpcRes<ClusterConnId>, SocketAddr)>,
history: HashMap<String, ClusterConnId>,
}

impl<T: NgTransport> NgControllerServer<T> {
pub fn new(transport: T, rpc_sender: Sender<Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>) -> Self {
impl<T: NgTransport, S: 'static + MediaEdgeSecure> NgControllerServer<T, S> {
pub fn new(transport: T, secure: Arc<S>, rpc_sender: Sender<Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>) -> Self {
let (answer_tx, answer_rx) = channel(10);
Self {
transport,
secure,
rpc_sender,
answer_tx,
answer_rx,
history: HashMap::new(),
}
}

Expand All @@ -41,40 +44,30 @@ impl<T: NgTransport> NgControllerServer<T> {
async fn process_req(&mut self, req: NgRequest, remote: SocketAddr) -> Option<()> {
let rpc_req = match req.command {
NgCommand::Ping => {
self.transport.send(req.answer(NgCmdResult::Pong { result: "OK".to_string() }), remote).await;
self.transport.send(req.answer(NgCmdResult::Pong { result: "ok".to_string() }), remote).await;
return Some(());
}
NgCommand::Offer { sdp, call_id, from_tag, .. } => {
let session_id = gen_cluster_session_id();
rtpengine::RpcReq::Connect(rtpengine::RtpConnectRequest {
call_id: RoomId(call_id),
leg_id: PeerId(from_tag),
sdp,
session_id,
})
}
NgCommand::Answer { sdp, call_id, to_tag, .. } => {
let session_id = gen_cluster_session_id();
rtpengine::RpcReq::Connect(rtpengine::RtpConnectRequest {
call_id: RoomId(call_id),
leg_id: PeerId(to_tag),
sdp,
session_id,
})
NgCommand::Offer { ref sdp, ref atm0s_token, .. } | NgCommand::Answer { ref sdp, ref atm0s_token, .. } => {
if let Some(token) = self.secure.decode_obj::<RtpEngineToken>(RTPENGINE_TOKEN, atm0s_token) {
let session_id = gen_cluster_session_id();
rtpengine::RpcReq::Connect(rtpengine::RtpConnectRequest {
session_id,
room: RoomId(token.room),
peer: PeerId(token.peer),
sdp: sdp.clone(),
record: token.record,
extra_data: token.extra_data,
})
} else {
self.send_err(&req, "TOKEN_FAILED", "Token parse error", remote).await;
return Some(());
}
}
NgCommand::Delete { ref from_tag, .. } => {
if let Some(conn) = self.history.get(from_tag) {
rtpengine::RpcReq::Delete(*conn)
NgCommand::Delete { ref conn_id, .. } => {
if let Ok(conn) = ClusterConnId::from_str(conn_id) {
rtpengine::RpcReq::Delete(conn)
} else {
self.transport
.send(
req.answer(NgCmdResult::Error {
error_reason: "NOT_FOUND".to_string(),
result: "Not found".to_string(),
}),
remote,
)
.await;
self.send_err(&req, "NOT_FOUND", "Connection parse error", remote).await;
return Some(());
}
}
Expand All @@ -100,17 +93,12 @@ impl<T: NgTransport> NgControllerServer<T> {

async fn process_res(&mut self, id: String, res: RpcRes<ClusterConnId>, dest: SocketAddr) -> Option<()> {
let result = match res {
RpcRes::RtpEngine(rtpengine::RpcRes::Connect(Ok((peer, conn, sdp)))) => {
self.history.insert(peer.0, conn);
NgCmdResult::Answer {
result: "ok".to_string(),
sdp: Some(sdp),
}
}
RpcRes::RtpEngine(rtpengine::RpcRes::Delete(Ok(peer_id))) => {
self.history.remove(&peer_id.0);
NgCmdResult::Delete { result: "ok".to_string() }
}
RpcRes::RtpEngine(rtpengine::RpcRes::Connect(Ok((conn, sdp)))) => NgCmdResult::Answer {
result: "ok".to_string(),
conn: Some(conn.to_string()),
sdp: Some(sdp),
},
RpcRes::RtpEngine(rtpengine::RpcRes::Delete(Ok(_conn))) => NgCmdResult::Delete { result: "ok".to_string() },
RpcRes::RtpEngine(rtpengine::RpcRes::Connect(Err(res))) | RpcRes::RtpEngine(rtpengine::RpcRes::Delete(Err(res))) => NgCmdResult::Error {
result: res.code.to_string(),
error_reason: res.message,
Expand All @@ -122,4 +110,16 @@ impl<T: NgTransport> NgControllerServer<T> {
self.transport.send(NgResponse { id, result }, dest).await;
Some(())
}

async fn send_err(&self, req: &NgRequest, result: &str, err: &str, remote: SocketAddr) {
self.transport
.send(
req.answer(NgCmdResult::Error {
error_reason: err.to_string(),
result: result.to_string(),
}),
remote,
)
.await;
}
}
25 changes: 14 additions & 11 deletions bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ pub struct Args {
webrtc_port_seed: u16,

/// Port for binding rtpengine command UDP socket.
#[arg(env, long, default_value_t = 22222)]
rtpengine_cmd_port: u16,
#[arg(env, long, default_value = "127.0.0.1:22222")]
rtpengine_cmd_addr: Option<SocketAddr>,

/// RtpEngine RTP Listen IP
/// Default: 127.0.0.1
Expand Down Expand Up @@ -91,8 +91,8 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
let secure = Arc::new(MediaEdgeSecureJwt::from(node.secret.as_bytes()));
let secure2 = args.enable_token_api.then(|| Arc::new(MediaGatewaySecureJwt::from(node.secret.as_bytes())));
let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024);
let req_tx2 = req_tx.clone();
if let Some(http_port) = http_port {
let req_tx2 = req_tx.clone();
let secure = secure.clone();
tokio::spawn(async move {
if let Err(e) = run_media_http_server(http_port, req_tx2, secure, secure2).await {
Expand All @@ -102,14 +102,17 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
}

//Running ng controller for Voip
let req_tx3 = req_tx.clone();
let rtpengine_udp = NgUdpTransport::new(args.rtpengine_cmd_port).await;
tokio::spawn(async move {
log::info!("[MediaServer] start ng_controller task");
let mut server = NgControllerServer::new(rtpengine_udp, req_tx3);
while server.recv().await.is_some() {}
log::info!("[MediaServer] stop ng_controller task");
});
if let Some(ngproto_addr) = args.rtpengine_cmd_addr {
let req_tx3 = req_tx.clone();
let rtpengine_udp = NgUdpTransport::new(ngproto_addr).await;
let secure2 = secure.clone();
tokio::spawn(async move {
log::info!("[MediaServer] start ng_controller task");
let mut server = NgControllerServer::new(rtpengine_udp, secure2, req_tx3);
while server.recv().await.is_some() {}
log::info!("[MediaServer] stop ng_controller task");
});
}

let node_id = node.node_id;
let node_session = random();
Expand Down
1 change: 1 addition & 0 deletions packages/media_gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod store_service;
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum ServiceKind {
Webrtc,
RtpEngine,
}

#[derive(Debug, Clone, Default)]
Expand Down
3 changes: 3 additions & 0 deletions packages/media_gateway/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct GatewayStore {
node: NodeMetrics,
location: Location,
webrtc: ServiceStore,
rtpengine: ServiceStore,
output: Option<PingEvent>,
max_cpu: u8,
max_memory: u8,
Expand All @@ -31,6 +32,7 @@ impl GatewayStore {
Self {
node: NodeMetrics::default(),
webrtc: ServiceStore::new(ServiceKind::Webrtc, location),
rtpengine: ServiceStore::new(ServiceKind::RtpEngine, location),
zone,
location,
output: None,
Expand Down Expand Up @@ -90,6 +92,7 @@ impl GatewayStore {
pub fn best_for(&self, kind: ServiceKind, location: Option<Location>) -> Option<u32> {
let node = match kind {
ServiceKind::Webrtc => self.webrtc.best_for(location),
ServiceKind::RtpEngine => self.rtpengine.best_for(location),
};
log::debug!("[GatewayStore] query best {:?} for {:?} got {:?}", kind, location, node);
node
Expand Down
19 changes: 14 additions & 5 deletions packages/media_runner/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
media_server_gateway::agent_service::Control::WorkerUsage(ServiceKind::Webrtc, self.worker, webrtc_live).into(),
)),
);

let rtpengine_live = self.media_rtpengine.tasks() as u32;
self.sdn_worker.input(s).on_event(
now_ms,
SdnWorkerInput::ExtWorker(SdnExtIn::ServicesControl(
AGENT_SERVICE_ID.into(),
UserData::Cluster,
media_server_gateway::agent_service::Control::WorkerUsage(ServiceKind::RtpEngine, self.worker, rtpengine_live).into(),
)),
);
}
}

Expand Down Expand Up @@ -484,8 +494,8 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {

fn output_rtpengine(&mut self, now: Instant, out: transport_rtpengine::GroupOutput) -> Output {
match out {
transport_rtpengine::GroupOutput::Ext(_session, ext) => match ext {
transport_rtpengine::ExtOut::Disconnect(req_id, peer_id) => Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::Delete(Ok(peer_id)))),
transport_rtpengine::GroupOutput::Ext(session, ext) => match ext {
transport_rtpengine::ExtOut::Disconnect(req_id) => Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::Delete(Ok(session.index())))),
},
transport_rtpengine::GroupOutput::Net(child, net) => Output::Net(Owner::RtpEngine(child), net),
transport_rtpengine::GroupOutput::Cluster(session, room, control) => {
Expand Down Expand Up @@ -620,13 +630,12 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
RpcReq::RtpEngine(req) => match req {
rtpengine::RpcReq::Connect(conn_req) => {
log::info!("[MediaServerWorker] on rpc request {req_id}, rtpengine::RpcReq::Connect");
let peer_id = conn_req.leg_id.clone();
match self
.media_rtpengine
.input(&mut self.switcher)
.spawn(conn_req.call_id, conn_req.leg_id, false, conn_req.session_id, &conn_req.sdp)
.spawn(conn_req.room, conn_req.peer, false, conn_req.session_id, &conn_req.sdp)
{
Ok((conn_id, sdp)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::Connect(Ok((peer_id, conn_id, sdp)))))),
Ok((conn_id, sdp)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::Connect(Ok((conn_id, sdp)))))),
Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::Connect(Err(e))))),
}
}
Expand Down
13 changes: 13 additions & 0 deletions packages/protocol/src/tokens.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use serde::{Deserialize, Serialize};

pub const WHIP_TOKEN: &str = "whip";
pub const WHEP_TOKEN: &str = "whep";
pub const WEBRTC_TOKEN: &str = "webrtc";
pub const RTPENGINE_TOKEN: &str = "rtpengine";

#[derive(Serialize, Deserialize, Debug)]
pub struct WhipToken {
pub room: String,
Expand All @@ -22,3 +27,11 @@ pub struct WebrtcToken {
pub record: bool,
pub extra_data: Option<String>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct RtpEngineToken {
pub room: String,
pub peer: String,
pub record: bool,
pub extra_data: Option<String>,
}
Loading

0 comments on commit 27b7434

Please sign in to comment.