Skip to content

Commit

Permalink
feat: embedded userdata to token (#379)
Browse files Browse the repository at this point in the history
* feat: embedded userdata to token

* temp fix clippy warns
  • Loading branch information
giangndm authored Jul 24, 2024
1 parent c264d57 commit a0e80d9
Show file tree
Hide file tree
Showing 26 changed files with 154 additions and 63 deletions.
6 changes: 4 additions & 2 deletions bin/src/http/api_media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
peer: token.peer.into(),
user_agent,
record: token.record,
userdata: token.userdata,
})));
ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
Expand Down Expand Up @@ -167,6 +168,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
room: token.room.into(),
peer: token.peer.unwrap_or_else(|| format!("whep-{}", (random::<u64>()))).into(),
user_agent,
userdata: token.userdata,
})));
ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
Expand Down Expand Up @@ -265,7 +267,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
return Err(poem::Error::from_string("Wrong peer".to_string(), StatusCode::FORBIDDEN));
}
}
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(session_id, ip_addr, user_agent, connect.0, token.record)));
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(session_id, ip_addr, user_agent, connect.0, token.userdata, token.record)));
ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
match res {
Expand Down Expand Up @@ -334,7 +336,7 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
}
}
log::info!("[MediaAPIs] restart_ice webrtc, ip {}, user_agent {}, conn {}, request {:?}", ip_addr, user_agent, conn_id.0, connect);
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(conn_id2, ip_addr, user_agent, connect.0, token.record)));
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::RestartIce(conn_id2, ip_addr, user_agent, connect.0, token.userdata, token.record)));
ctx.sender.send(req).await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
let res = rx.await.map_err(|_e| poem::Error::from_status(StatusCode::INTERNAL_SERVER_ERROR))?;
match res {
Expand Down
15 changes: 14 additions & 1 deletion bin/src/http/api_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ struct WhipTokenReq {
peer: String,
ttl: u64,
record: Option<bool>,
userdata: Option<String>,
}

#[derive(poem_openapi::Object)]
Expand All @@ -37,6 +38,7 @@ struct WhepTokenReq {
room: String,
peer: Option<String>,
ttl: u64,
userdata: Option<String>,
}

#[derive(poem_openapi::Object)]
Expand All @@ -50,6 +52,7 @@ struct WebrtcTokenReq {
peer: Option<String>,
ttl: u64,
record: Option<bool>,
userdata: Option<String>,
}

#[derive(poem_openapi::Object)]
Expand Down Expand Up @@ -81,6 +84,7 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
room: body.room,
peer: body.peer,
record: body.record.unwrap_or(false),
userdata: body.userdata,
},
body.ttl,
),
Expand All @@ -104,7 +108,15 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
Json(Response {
status: true,
data: Some(WhepTokenRes {
token: ctx.secure.encode_obj("whep", WhepToken { room: body.room, peer: body.peer }, body.ttl),
token: ctx.secure.encode_obj(
"whep",
WhepToken {
room: body.room,
peer: body.peer,
userdata: body.userdata,
},
body.ttl,
),
}),
..Default::default()
})
Expand All @@ -130,6 +142,7 @@ impl<S: 'static + MediaGatewaySecure + Send + Sync> TokenApis<S> {
room: body.room,
peer: body.peer,
record: body.record.unwrap_or(false),
userdata: body.userdata,
},
body.ttl,
),
Expand Down
14 changes: 10 additions & 4 deletions bin/src/server/gateway/local_rpc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ impl MediaLocalRpcHandler {
whep::RpcReq::Delete(param) => RpcRes::Whep(whep::RpcRes::Delete(self.whep_delete(conn_part, param).await)),
},
RpcReq::Webrtc(param) => match param {
webrtc::RpcReq::Connect(session_id, ip, user_agent, param, record) => RpcRes::Webrtc(webrtc::RpcRes::Connect(self.webrtc_connect(session_id, ip, user_agent, param, record).await)),
webrtc::RpcReq::Connect(session_id, ip, user_agent, param, userdata, record) => {
RpcRes::Webrtc(webrtc::RpcRes::Connect(self.webrtc_connect(session_id, ip, user_agent, param, userdata, record).await))
}
webrtc::RpcReq::RemoteIce(conn, param) => RpcRes::Webrtc(webrtc::RpcRes::RemoteIce(self.webrtc_remote_ice(conn_part, conn, param).await)),
webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, record) => {
RpcRes::Webrtc(webrtc::RpcRes::RestartIce(self.webrtc_restart_ice(conn_part, conn, ip, user_agent, req, record).await))
webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, userdata, record) => {
RpcRes::Webrtc(webrtc::RpcRes::RestartIce(self.webrtc_restart_ice(conn_part, conn, ip, user_agent, req, userdata, record).await))
}
webrtc::RpcReq::Delete(_) => {
//TODO implement delete webrtc conn
Expand Down Expand Up @@ -274,7 +276,7 @@ impl MediaLocalRpcHandler {
Webrtc part
*/

async fn webrtc_connect(&self, session_id: u64, ip: IpAddr, user_agent: String, req: ConnectRequest, record: bool) -> RpcResult<(ClusterConnId, ConnectResponse)> {
async fn webrtc_connect(&self, session_id: u64, ip: IpAddr, user_agent: String, req: ConnectRequest, userdata: Option<String>, record: bool) -> RpcResult<(ClusterConnId, ConnectResponse)> {
let started_at = now_ms();
self.feedback_route_begin(session_id, ip).await;

Expand All @@ -287,6 +289,7 @@ impl MediaLocalRpcHandler {
ip: ip.to_string(),
req: Some(req),
record,
userdata,
};
let res = self.client.webrtc_connect(sock_addr, rpc_req).await;
log::info!("[Gateway] response from node {node_id} => {:?}", res);
Expand Down Expand Up @@ -332,13 +335,15 @@ impl MediaLocalRpcHandler {
}
}

#[allow(clippy::too_many_arguments)]
async fn webrtc_restart_ice(
&self,
conn_part: Option<(NodeId, u64)>,
conn: ClusterConnId,
ip: IpAddr,
user_agent: String,
req: ConnectRequest,
userdata: Option<String>,
record: bool,
) -> RpcResult<(ClusterConnId, ConnectResponse)> {
//TODO how to handle media-node down?
Expand All @@ -349,6 +354,7 @@ impl MediaLocalRpcHandler {
user_agent,
req: Some(req),
record,
userdata,
};
log::info!("[Gateway] selected node {node}");
let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT);
Expand Down
10 changes: 9 additions & 1 deletion bin/src/server/media/rpc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,14 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {
/* Start of sdk */
async fn webrtc_connect(&self, ctx: &Ctx, req: WebrtcConnectRequest) -> Option<WebrtcConnectResponse> {
log::info!("On webrtc_connect from gateway");
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(req.session_id, req.ip.parse().ok()?, req.user_agent, req.req?, req.record)));
let (req, rx) = Rpc::new(RpcReq::Webrtc(webrtc::RpcReq::Connect(
req.session_id,
req.ip.parse().ok()?,
req.user_agent,
req.req?,
req.userdata,
req.record,
)));
ctx.req_tx.send(req).await.ok()?;
let res = rx.await.ok()?;
match res {
Expand Down Expand Up @@ -152,6 +159,7 @@ impl MediaEdgeServiceHandler<Ctx> for MediaRpcHandlerImpl {
req.ip.parse().ok()?,
req.user_agent,
req.req?,
req.userdata,
req.record,
)));
ctx.req_tx.send(req).await.ok()?;
Expand Down
1 change: 1 addition & 0 deletions bin/src/server/media/runtime_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use sans_io_runtime::{BusChannelControl, BusControl, BusEvent, WorkerInner, Work

use crate::NodeConfig;

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)]
pub enum ExtIn {
/// ext, send controller or worker, true is controller
Expand Down
1 change: 1 addition & 0 deletions docs/user-guide/features/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ In this document, we will explore the key features of atm0s-media-server. Curren
| [Simulcast/SVC](./simulcast-svc.md) | Alpha |
| [Recording](./recording.md) | TODO |
| [Cluster](./cluster.md) | Alpha |
| [Userdata-metadata](./userdata-metadata.md) | Alpha |
6 changes: 6 additions & 0 deletions docs/user-guide/features/userdata-metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Userdata and metadata

Userdata and metadata is used with same goal is providing addition data beside of only room and peer. We use 2 terms userdata and metadata with some difference:

- Userdata: a string which is embedded to peer token, which can not be changed by client
- Metadata: a string which is embedded to peer or track by client. In there peer metadata is set at join-room step, track metadata is set at publish track step.
2 changes: 1 addition & 1 deletion packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ mod tests {
let room_peers_map = id_generator::peers_map(userdata.0);
let peer = PeerId("peer1".to_string());
let peer_key = id_generator::peers_key(&peer);
let peer_info = PeerInfo::new(peer.clone(), PeerMeta { metadata: None });
let peer_info = PeerInfo::new(peer.clone(), PeerMeta { metadata: None, userdata: None });

let now = Instant::now();
// Not join room with scope (peer true, track false) should Set and Sub
Expand Down
2 changes: 1 addition & 1 deletion packages/media_core/src/cluster/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ mod tests {
endpoint,
ClusterEndpointControl::Join(
peer.clone(),
PeerMeta { metadata: None },
PeerMeta { metadata: None, userdata: None },
RoomInfoPublish { peer: false, tracks: false },
RoomInfoSubscribe { peers: true, tracks: true },
Some(AudioMixerConfig {
Expand Down
24 changes: 12 additions & 12 deletions packages/media_core/src/cluster/room/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ mod tests {
let room: ClusterRoomHash = 1.into();
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None };
let peer_meta = PeerMeta { metadata: None, userdata: None };
let endpoint = 1;
room_meta.on_join(
endpoint,
Expand All @@ -408,7 +408,7 @@ mod tests {
let tracks_map = id_generator::tracks_map(room);
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None };
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone());
let peer_key = id_generator::peers_key(&peer_id);
let endpoint = 1;
Expand Down Expand Up @@ -464,14 +464,14 @@ mod tests {

let peer2: PeerId = "peer2".to_string().into();
let peer2_key = id_generator::peers_key(&peer2);
let peer2_info = PeerInfo::new(peer2, PeerMeta { metadata: None });
let peer2_info = PeerInfo::new(peer2, PeerMeta { metadata: None, userdata: None });

room_meta.on_kv_event(peers_map, MapEvent::OnSet(peer2_key, 0, peer2_info.serialize()));
assert_eq!(room_meta.pop_output(()), None);

let endpoint = 1;
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None };
let peer_meta = PeerMeta { metadata: None, userdata: None };
room_meta.on_join(
endpoint,
peer_id.clone(),
Expand Down Expand Up @@ -500,7 +500,7 @@ mod tests {
let tracks_map = id_generator::tracks_map(room);
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None };
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone());
let peer_key = id_generator::peers_key(&peer_id);
let endpoint = 1;
Expand Down Expand Up @@ -569,7 +569,7 @@ mod tests {

let endpoint = 1;
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None };
let peer_meta = PeerMeta { metadata: None, userdata: None };
room_meta.on_join(
endpoint,
peer_id.clone(),
Expand Down Expand Up @@ -601,7 +601,7 @@ mod tests {
let tracks_map = id_generator::tracks_map(room);
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None };
let peer_meta = PeerMeta { metadata: None, userdata: None };
let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone());
let peer_key = id_generator::peers_key(&peer_id);
let endpoint = 1;
Expand Down Expand Up @@ -643,7 +643,7 @@ mod tests {
let room: ClusterRoomHash = 1.into();
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None };
let peer_meta = PeerMeta { metadata: None, userdata: None };
let endpoint = 1;
room_meta.on_join(
endpoint,
Expand Down Expand Up @@ -702,7 +702,7 @@ mod tests {

let endpoint = 1;
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None };
let peer_meta = PeerMeta { metadata: None, userdata: None };
room_meta.on_join(
endpoint,
peer_id.clone(),
Expand Down Expand Up @@ -748,7 +748,7 @@ mod tests {

let endpoint = 1;
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None };
let peer_meta = PeerMeta { metadata: None, userdata: None };
room_meta.on_join(
endpoint,
peer_id.clone(),
Expand Down Expand Up @@ -783,7 +783,7 @@ mod tests {

let endpoint = 1;
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None };
let peer_meta = PeerMeta { metadata: None, userdata: None };
room_meta.on_join(
endpoint,
peer_id.clone(),
Expand Down Expand Up @@ -823,7 +823,7 @@ mod tests {
let room: ClusterRoomHash = 1.into();
let mut room_meta: RoomMetadata<u8> = RoomMetadata::<u8>::new(room);
let peer_id: PeerId = "peer1".to_string().into();
let peer_meta = PeerMeta { metadata: None };
let peer_meta = PeerMeta { metadata: None, userdata: None };
let endpoint = 1;
room_meta.on_join(
endpoint,
Expand Down
4 changes: 2 additions & 2 deletions packages/media_core/src/endpoint/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ mod tests {

let room: RoomId = "room".into();
let peer: PeerId = "peer".into();
let meta = PeerMeta { metadata: None };
let meta = PeerMeta { metadata: None, userdata: None };
let publish = RoomInfoPublish { peer: true, tracks: true };
let subscribe = RoomInfoSubscribe { peers: true, tracks: true };
internal.on_transport_rpc(now, 0.into(), EndpointReq::JoinRoom(room.clone(), peer.clone(), meta.clone(), publish.clone(), subscribe.clone(), None));
Expand Down Expand Up @@ -595,7 +595,7 @@ mod tests {
let room1: RoomId = "room1".into();
let room1_hash = ClusterRoomHash::from(&room1);
let peer: PeerId = "peer".into();
let meta = PeerMeta { metadata: None };
let meta = PeerMeta { metadata: None, userdata: None };
let publish = RoomInfoPublish { peer: true, tracks: true };
let subscribe = RoomInfoSubscribe { peers: true, tracks: true };
internal.on_transport_rpc(
Expand Down
15 changes: 9 additions & 6 deletions packages/media_runner/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
match self
.media_webrtc
.input(&mut self.switcher)
.spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whip(req.room, req.peer, req.record), &req.sdp)
.spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whip(req.room, req.peer, req.userdata, req.record), &req.sdp)
{
Ok((_ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Ok(WhipConnectRes { conn_id, sdp }))))),
Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Err(e))))),
Expand All @@ -482,7 +482,7 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
match self
.media_webrtc
.input(&mut self.switcher)
.spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whep(req.room, peer_id.into()), &req.sdp)
.spawn(req.ip, req.session_id, transport_webrtc::VariantParams::Whep(req.room, peer_id.into(), req.userdata), &req.sdp)
{
Ok((_ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Ok(WhepConnectRes { conn_id, sdp }))))),
Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Err(e))))),
Expand All @@ -502,11 +502,11 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
}
},
RpcReq::Webrtc(req) => match req {
webrtc::RpcReq::Connect(session_id, ip, user_agent, req, record) => {
webrtc::RpcReq::Connect(session_id, ip, user_agent, req, userdata, record) => {
match self
.media_webrtc
.input(&mut self.switcher)
.spawn(ip, session_id, VariantParams::Webrtc(user_agent, req.clone(), record, self.secure.clone()), &req.sdp)
.spawn(ip, session_id, VariantParams::Webrtc(user_agent, req.clone(), userdata, record, self.secure.clone()), &req.sdp)
{
Ok((ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(
req_id,
Expand All @@ -529,11 +529,14 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
GroupInput::Ext(conn.into(), transport_webrtc::ExtIn::RemoteIce(req_id, transport_webrtc::Variant::Webrtc, ice.candidates)),
);
}
webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, record) => {
webrtc::RpcReq::RestartIce(conn, ip, user_agent, req, userdata, record) => {
log::info!("on rpc request {req_id}, webrtc::RpcReq::RestartIce");
self.media_webrtc.input(&mut self.switcher).on_event(
now,
GroupInput::Ext(conn.into(), transport_webrtc::ExtIn::RestartIce(req_id, transport_webrtc::Variant::Webrtc, ip, user_agent, req, record)),
GroupInput::Ext(
conn.into(),
transport_webrtc::ExtIn::RestartIce(req_id, transport_webrtc::Variant::Webrtc, ip, user_agent, req, userdata, record),
),
);
}
webrtc::RpcReq::Delete(_) => todo!(),
Expand Down
Loading

0 comments on commit a0e80d9

Please sign in to comment.