Skip to content

Commit

Permalink
WIP: works app_sync logic, cluster with multi-tennancy by custom Clus…
Browse files Browse the repository at this point in the history
…terRoomHash generator
  • Loading branch information
giangndm committed Sep 29, 2024
1 parent 27d88bc commit eccbf5a
Show file tree
Hide file tree
Showing 59 changed files with 1,086 additions and 464 deletions.
380 changes: 368 additions & 12 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ rand = "0.8"
mockall = "0.13"
prost = "0.13"
indexmap = "2.2"
spin = "0.9"
httpmock = "0.7"
2 changes: 1 addition & 1 deletion bin/src/http/api_console/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Apis {
/// login with user credentials
#[oai(path = "/user/login", method = "post")]
async fn user_login(&self, Data(ctx): Data<&ConsoleApisCtx>, body: Json<UserLoginReq>) -> Json<Response<UserLoginRes>> {
if ctx.secure.validate_secert(&body.secret) {
if ctx.secure.validate_secret(&body.secret) {
Json(Response {
status: true,
data: Some(UserLoginRes { token: ctx.secure.generate_token() }),
Expand Down
14 changes: 5 additions & 9 deletions bin/src/http/api_media/rtpengine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use media_server_protocol::{
cluster::gen_cluster_session_id,
endpoint::ClusterConnId,
tokens::{RtpEngineToken, RTPENGINE_TOKEN},
tokens::RtpEngineToken,
transport::{
rtpengine::{self, RtpCreateAnswerRequest, RtpCreateOfferRequest, RtpSetAnswerRequest},
RpcReq, RpcRes, RpcResult,
Expand Down Expand Up @@ -35,12 +35,10 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> RtpengineApis<S> {
#[oai(path = "/offer", method = "post")]
async fn create_offer(&self, RemoteIpAddr(ip_addr): RemoteIpAddr, TokenAuthorization(token): TokenAuthorization) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = self
.secure
.decode_obj::<RtpEngineToken>(RTPENGINE_TOKEN, &token.token)
.ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<RtpEngineToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[MediaAPIs] create rtpengine endpoint with token {token:?}, ip {ip_addr}");
let (req, rx) = Rpc::new(RpcReq::RtpEngine(rtpengine::RpcReq::CreateOffer(RtpCreateOfferRequest {
app: app_ctx,
session_id,
room: token.room.into(),
peer: token.peer.into(),
Expand Down Expand Up @@ -77,12 +75,10 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> RtpengineApis<S> {
body: ApplicationSdp<String>,
) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = self
.secure
.decode_obj::<RtpEngineToken>(RTPENGINE_TOKEN, &token.token)
.ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<RtpEngineToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[MediaAPIs] create rtpengine endpoint with token {token:?}, ip {ip_addr}");
let (req, rx) = Rpc::new(RpcReq::RtpEngine(rtpengine::RpcReq::CreateAnswer(RtpCreateAnswerRequest {
app: app_ctx,
session_id,
sdp: body.0,
room: token.room.into(),
Expand Down
12 changes: 8 additions & 4 deletions bin/src/http/api_media/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WebrtcApis<S> {
connect: Protobuf<ConnectRequest>,
) -> Result<HttpResponse<Protobuf<ConnectResponse>>> {
let session_id = gen_cluster_session_id();
let token = self.secure.decode_obj::<WebrtcToken>("webrtc", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<WebrtcToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[MediaAPIs] create webrtc with token {:?}, ip {}, user_agent {}, request {:?}", token, ip_addr, user_agent, connect);
if let Some(join) = &connect.join {
if token.room != Some(join.room.clone()) {
Expand All @@ -47,7 +47,9 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WebrtcApis<S> {
return Err(poem::Error::from_string("Wrong peer".to_string(), StatusCode::FORBIDDEN));
}
}
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(session_id, ip_addr, user_agent, connect.0, token.extra_data, token.record)));
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(
app_ctx, session_id, ip_addr, user_agent, connect.0, token.extra_data, token.record,
)));
self.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
match res {
Expand Down Expand Up @@ -104,7 +106,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WebrtcApis<S> {
connect: Protobuf<ConnectRequest>,
) -> Result<HttpResponse<Protobuf<ConnectResponse>>> {
let conn_id2 = conn_id.0.parse().map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token = self.secure.decode_obj::<WebrtcToken>("webrtc", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<WebrtcToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
if let Some(join) = &connect.join {
if token.room != Some(join.room.clone()) {
return Err(poem::Error::from_string("Wrong room".to_string(), StatusCode::FORBIDDEN));
Expand All @@ -115,7 +117,9 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WebrtcApis<S> {
}
}
log::info!("[MediaAPIs] restart_ice webrtc, ip {}, user_agent {}, conn {}, request {:?}", ip_addr, user_agent, conn_id.0, connect);
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(conn_id2, ip_addr, user_agent, connect.0, token.extra_data, token.record)));
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(
conn_id2, app_ctx, ip_addr, user_agent, connect.0, token.extra_data, token.record,
)));
self.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
match res {
Expand Down
3 changes: 2 additions & 1 deletion bin/src/http/api_media/whep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WhepApis<S> {
body: ApplicationSdp<String>,
) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = self.secure.decode_obj::<WhepToken>("whep", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<WhepToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[MediaAPIs] create whep endpoint with token {:?}, ip {}, user_agent {}", token, ip_addr, user_agent);
let (req, rx) = Rpc::new(RpcReq::Whep(whep::RpcReq::Connect(WhepConnectReq {
app: app_ctx,
session_id,
ip: ip_addr,
sdp: body.0,
Expand Down
3 changes: 2 additions & 1 deletion bin/src/http/api_media/whip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> WhipApis<S> {
body: ApplicationSdp<String>,
) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = self.secure.decode_obj::<WhipToken>("whip", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let (app_ctx, token) = self.secure.decode_token::<WhipToken>(&token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
log::info!("[MediaAPIs] create whip endpoint with token {:?}, ip {}, user_agent {}", token, ip_addr, user_agent);
let (req, rx) = Rpc::new(RpcReq::Whip(whip::RpcReq::Connect(WhipConnectReq {
app: app_ctx,
session_id,
ip: ip_addr,
sdp: body.0,
Expand Down
26 changes: 13 additions & 13 deletions bin/src/http/api_token.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{marker::PhantomData, sync::Arc};

use super::{utils::TokenAuthorization, Response};
use media_server_protocol::tokens::{RtpEngineToken, WebrtcToken, WhepToken, WhipToken, RTPENGINE_TOKEN, WEBRTC_TOKEN, WHEP_TOKEN, WHIP_TOKEN};
use media_server_protocol::tokens::{RtpEngineToken, WebrtcToken, WhepToken, WhipToken};
use media_server_secure::MediaGatewaySecure;
use poem::{web::Data, Result};
use poem_openapi::{payload::Json, OpenApi};
Expand Down Expand Up @@ -87,13 +87,13 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
/// create whip session token
#[oai(path = "/whip", method = "post")]
async fn whip_token(&self, Data(ctx): Data<&TokenServerCtx<S>>, body: Json<WhipTokenReq>, TokenAuthorization(token): TokenAuthorization) -> Result<Json<Response<WhipTokenRes>>> {
if ctx.secure.validate_app(&token.token) {
if let Some(app_ctx) = ctx.secure.validate_app(&token.token) {
let body = body.0;
Ok(Json(Response {
status: true,
data: Some(WhipTokenRes {
token: ctx.secure.encode_obj(
WHIP_TOKEN,
token: ctx.secure.encode_token(
&app_ctx,
WhipToken {
room: body.room,
peer: body.peer,
Expand All @@ -117,13 +117,13 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
/// create whep session token
#[oai(path = "/whep", method = "post")]
async fn whep_token(&self, Data(ctx): Data<&TokenServerCtx<S>>, body: Json<WhepTokenReq>, TokenAuthorization(token): TokenAuthorization) -> Json<Response<WhepTokenRes>> {
if ctx.secure.validate_app(&token.token) {
if let Some(app_ctx) = ctx.secure.validate_app(&token.token) {
let body = body.0;
Json(Response {
status: true,
data: Some(WhepTokenRes {
token: ctx.secure.encode_obj(
WHEP_TOKEN,
token: ctx.secure.encode_token(
&app_ctx,
WhepToken {
room: body.room,
peer: body.peer,
Expand All @@ -145,13 +145,13 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {

#[oai(path = "/webrtc", method = "post")]
async fn webrtc_token(&self, Data(ctx): Data<&TokenServerCtx<S>>, body: Json<WebrtcTokenReq>, TokenAuthorization(token): TokenAuthorization) -> Json<Response<WebrtcTokenRes>> {
if ctx.secure.validate_app(&token.token) {
if let Some(app_ctx) = ctx.secure.validate_app(&token.token) {
let body = body.0;
Json(Response {
status: true,
data: Some(WebrtcTokenRes {
token: ctx.secure.encode_obj(
WEBRTC_TOKEN,
token: ctx.secure.encode_token(
&app_ctx,
WebrtcToken {
room: body.room,
peer: body.peer,
Expand All @@ -175,13 +175,13 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
/// create rtpengine session token
#[oai(path = "/rtpengine", method = "post")]
async fn rtpengine_token(&self, Data(ctx): Data<&TokenServerCtx<S>>, body: Json<RtpEngineTokenReq>, TokenAuthorization(token): TokenAuthorization) -> Result<Json<Response<RtpEngineTokenRes>>> {
if ctx.secure.validate_app(&token.token) {
if let Some(app_ctx) = ctx.secure.validate_app(&token.token) {
let body = body.0;
Ok(Json(Response {
status: true,
data: Some(RtpEngineTokenRes {
token: ctx.secure.encode_obj(
RTPENGINE_TOKEN,
token: ctx.secure.encode_token(
&app_ctx,
RtpEngineToken {
room: body.room,
peer: body.peer,
Expand Down
11 changes: 6 additions & 5 deletions bin/src/ng_controller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use media_server_protocol::cluster::gen_cluster_session_id;
use media_server_protocol::endpoint::{ClusterConnId, PeerId, RoomId};
use media_server_protocol::tokens::{RtpEngineToken, RTPENGINE_TOKEN};
use media_server_protocol::endpoint::ClusterConnId;
use media_server_protocol::tokens::RtpEngineToken;
use media_server_protocol::transport::{rtpengine, RpcReq, RpcRes};
use media_server_secure::MediaEdgeSecure;
use media_server_utils::select2;
Expand Down Expand Up @@ -48,12 +48,13 @@ impl<T: NgTransport, S: 'static + MediaEdgeSecure> NgControllerServer<T, S> {
return Some(());
}
NgCommand::Offer { ref sdp, ref atm0s_token, .. } | NgCommand::Answer { ref sdp, ref atm0s_token, .. } => {
if let Some(token) = self.secure.decode_obj::<RtpEngineToken>(RTPENGINE_TOKEN, atm0s_token) {
if let Some((app_ctx, token)) = self.secure.decode_token::<RtpEngineToken>(atm0s_token) {
let session_id = gen_cluster_session_id();
rtpengine::RpcReq::CreateAnswer(rtpengine::RtpCreateAnswerRequest {
app: app_ctx,
session_id,
room: RoomId(token.room),
peer: PeerId(token.peer),
room: token.room.into(),
peer: token.peer.into(),
sdp: sdp.clone(),
record: token.record,
extra_data: token.extra_data,
Expand Down
22 changes: 20 additions & 2 deletions bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};
use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, services::visualization, SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner};
use clap::Parser;
use media_server_connector::agent_service::ConnectorAgentServiceBuilder;
use media_server_gateway::{store_service::GatewayStoreServiceBuilder, STORE_SERVICE_ID};
use media_server_gateway::{store_service::GatewayStoreServiceBuilder, MultiTenancyStorage, MultiTenancySync, STORE_SERVICE_ID};
use media_server_protocol::{
cluster::{ClusterGatewayInfo, ClusterNodeGenericInfo, ClusterNodeInfo},
gateway::{generate_gateway_zone_tag, GATEWAY_RPC_PORT},
Expand Down Expand Up @@ -76,6 +76,14 @@ pub struct Args {
/// The port for binding the RTPengine command UDP socket.
#[arg(env, long)]
rtpengine_cmd_addr: Option<SocketAddr>,

/// multi-tenancy sync endpoint
#[arg(env, long)]
multi_tenancy_sync: Option<String>,

/// multi-tenancy sync endpoint
#[arg(env, long, default_value_t = 30_000)]
multi_tenancy_sync_interval_ms: u64,
}

pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: NodeConfig, args: Args) {
Expand All @@ -90,7 +98,17 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
let (connector_agent_tx, mut connector_agent_rx) = tokio::sync::mpsc::channel::<media_server_connector::agent_service::Control>(1024);

let edge_secure = Arc::new(MediaEdgeSecureJwt::from(node.secret.as_bytes()));
let gateway_secure = Arc::new(MediaGatewaySecureJwt::from(node.secret.as_bytes()));
let mut gateway_secure = MediaGatewaySecureJwt::from(node.secret.as_bytes());
if let Some(url) = args.multi_tenancy_sync {
let app_storage = Arc::new(MultiTenancyStorage::default());
gateway_secure.set_app_storage(app_storage.clone());
let mut app_sync = MultiTenancySync::new(app_storage, &url, Duration::from_millis(args.multi_tenancy_sync_interval_ms));
tokio::spawn(async move {
app_sync.run_loop().await;
});
}
let gateway_secure = Arc::new(gateway_secure);

let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024);
if let Some(http_port) = http_port {
let req_tx = req_tx.clone();
Expand Down
23 changes: 18 additions & 5 deletions bin/src/server/gateway/local_rpc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use media_server_gateway::ServiceKind;
use media_server_protocol::{
endpoint::ClusterConnId,
gateway::GATEWAY_RPC_PORT,
multi_tenancy::AppContext,
protobuf::{
cluster_connector::peer_event::RouteBegin,
cluster_gateway::MediaEdgeServiceClient,
Expand Down Expand Up @@ -127,12 +128,12 @@ impl MediaLocalRpcHandler {
whep::RpcReq::Delete(param) => RpcRes::Whep(whep::RpcRes::Delete(self.whep_delete(conn_part, param).await)),
},
RpcReq::Webrtc(param) => match param {
webrtc::RpcReq::Connect(session_id, ip, user_agent, param, extra_data, record) => {
RpcRes::Webrtc(webrtc::RpcRes::Connect(self.webrtc_connect(session_id, ip, user_agent, param, extra_data, record).await))
webrtc::RpcReq::Connect(app, session_id, ip, user_agent, param, extra_data, record) => {
RpcRes::Webrtc(webrtc::RpcRes::Connect(self.webrtc_connect(session_id, app, ip, user_agent, param, extra_data, record).await))
}
webrtc::RpcReq::RemoteIce(conn, param) => RpcRes::Webrtc(webrtc::RpcRes::RemoteIce(self.webrtc_remote_ice(conn_part, conn, param).await)),
webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, extra_data, record) => {
RpcRes::Webrtc(webrtc::RpcRes::RestartIce(self.webrtc_restart_ice(conn_part, conn, ip, user_agent, req, extra_data, record).await))
webrtc::RpcReq::RestartIce(conn, app, ip, user_agent, req, extra_data, record) => {
RpcRes::Webrtc(webrtc::RpcRes::RestartIce(self.webrtc_restart_ice(conn_part, conn, app, ip, user_agent, req, extra_data, record).await))
}
webrtc::RpcReq::Delete(_) => {
//TODO implement delete webrtc conn
Expand Down Expand Up @@ -286,14 +287,24 @@ impl MediaLocalRpcHandler {
Webrtc part
*/

async fn webrtc_connect(&self, session_id: u64, ip: IpAddr, user_agent: String, req: ConnectRequest, extra_data: Option<String>, record: bool) -> RpcResult<(ClusterConnId, ConnectResponse)> {
async fn webrtc_connect(
&self,
session_id: u64,
app: AppContext,
ip: IpAddr,
user_agent: String,
req: ConnectRequest,
extra_data: Option<String>,
record: bool,
) -> RpcResult<(ClusterConnId, ConnectResponse)> {
let started_at = now_ms();
self.feedback_route_begin(session_id, ip).await;

if let Some(node_id) = self.selector.select(ServiceKind::Webrtc, self.ip2location.get_location(&ip)).await {
let sock_addr = node_vnet_addr(node_id, GATEWAY_RPC_PORT);
log::info!("[Gateway] selected node {node_id}");
let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcConnectRequest {
app: Some(app.into()),
session_id,
user_agent,
ip: ip.to_string(),
Expand Down Expand Up @@ -344,6 +355,7 @@ impl MediaLocalRpcHandler {
&self,
conn_part: Option<(NodeId, u64)>,
conn: ClusterConnId,
app: AppContext,
ip: IpAddr,
user_agent: String,
req: ConnectRequest,
Expand All @@ -366,6 +378,7 @@ impl MediaLocalRpcHandler {
};
log::info!("[Gateway] selected dest node {dest} with provided node {node}");
let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRestartIceRequest {
app: Some(app.into()),
conn: conn.to_string(),
ip: ip.to_string(),
user_agent,
Expand Down
2 changes: 2 additions & 0 deletions bin/src/server/media/rpc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {
async fn webrtc_connect(&self, ctx: &Ctx, req: WebrtcConnectRequest) -> Option<WebrtcConnectResponse> {
log::info!("On webrtc_connect from gateway");
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(
req.app.into(),
req.session_id,
req.ip.parse().ok()?,
req.user_agent,
Expand Down Expand Up @@ -158,6 +159,7 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {
log::info!("On webrtc_restart_ice from gateway");
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(
req.conn.parse().ok()?,
req.app.into(),
req.ip.parse().ok()?,
req.user_agent,
req.req?,
Expand Down
Loading

0 comments on commit eccbf5a

Please sign in to comment.