diff --git a/bin/src/http.rs b/bin/src/http.rs index ea885ccd..951c0d24 100644 --- a/bin/src/http.rs +++ b/bin/src/http.rs @@ -68,6 +68,8 @@ pub async fn run_console_http_server( storage: crate::server::console_storage::StorageShared, connector: MediaConnectorServiceClient, ) -> Result<(), Box> { + use poem::middleware::Tracing; + let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/"); let user_ui = user_service.swagger_ui(); let user_spec = user_service.spec(); @@ -101,7 +103,8 @@ pub async fn run_console_http_server( .nest("/api/connector/", connector_service.data(ctx.clone())) .nest("/api/connector/ui", connector_ui) .at("/api/connector/spec", poem::endpoint::make_sync(move |_| connector_spec.clone())) - .with(Cors::new()); + .with(Cors::new()) + .with(Tracing::default()); Server::new(TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), port))).run(route).await?; Ok(()) diff --git a/packages/media_runner/src/worker.rs b/packages/media_runner/src/worker.rs index e69add29..6db4880a 100644 --- a/packages/media_runner/src/worker.rs +++ b/packages/media_runner/src/worker.rs @@ -549,8 +549,14 @@ impl MediaServerWorker { transport_webrtc::VariantParams::Whip(req.room, req.peer, req.extra_data, req.record), &req.sdp, ) { - Ok((_ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Ok(WhipConnectRes { conn_id, sdp }))))), - Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Err(e))))), + Ok((_ice_lite, sdp, conn_id)) => { + log::info!("[MediaServerWorker] rpc request {req_id}, whip::RpcReq::Connect => created conn {conn_id}"); + self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Ok(WhipConnectRes { conn_id, sdp }))))) + } + Err(e) => { + log::error!("[MediaServerWorker] rpc request {req_id}, whip::RpcReq::Connect => error {e:?}"); + self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Err(e))))) + } } } whip::RpcReq::RemoteIce(req) => { @@ -579,8 +585,14 @@ impl MediaServerWorker { transport_webrtc::VariantParams::Whep(req.room, peer_id.into(), req.extra_data), &req.sdp, ) { - Ok((_ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Ok(WhepConnectRes { conn_id, sdp }))))), - Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Err(e))))), + Ok((_ice_lite, sdp, conn_id)) => { + log::info!("[MediaServerWorker] rpc request {req_id}, whep::RpcReq::Connect => created conn {conn_id}"); + self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Ok(WhepConnectRes { conn_id, sdp }))))) + } + Err(e) => { + log::info!("[MediaServerWorker] rpc request {req_id}, whep::RpcReq::Connect => error {e:?}"); + self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Err(e))))) + } } } whep::RpcReq::RemoteIce(req) => { @@ -606,18 +618,24 @@ impl MediaServerWorker { .input(&mut self.switcher) .spawn(app, ip, session_id, VariantParams::Webrtc(user_agent, req.clone(), extra_data, record, self.secure.clone()), &req.sdp) { - Ok((ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc( - req_id, - RpcRes::Webrtc(webrtc::RpcRes::Connect(Ok(( - conn_id, - ConnectResponse { - conn_id: "".to_string(), - sdp, - ice_lite, - }, - )))), - )), - Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Webrtc(webrtc::RpcRes::Connect(Err(e))))), + Ok((ice_lite, sdp, conn_id)) => { + log::info!("[MediaServerWorker] rpc request {req_id}, webrtc::RpcReq::Connect => created conn {conn_id}"); + self.queue.push_back(Output::ExtRpc( + req_id, + RpcRes::Webrtc(webrtc::RpcRes::Connect(Ok(( + conn_id, + ConnectResponse { + conn_id: "".to_string(), + sdp, + ice_lite, + }, + )))), + )) + } + Err(e) => { + log::error!("[MediaServerWorker] rpc request {req_id}, webrtc::RpcReq::Connect => error {e:?}"); + self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Webrtc(webrtc::RpcRes::Connect(Err(e))))) + } } } webrtc::RpcReq::RemoteIce(conn, ice) => { @@ -653,8 +671,14 @@ impl MediaServerWorker { .input(&mut self.switcher) .spawn(conn_req.app, conn_req.room, conn_req.peer, false, conn_req.session_id, None) { - Ok((conn_id, sdp)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateOffer(Ok((conn_id, sdp)))))), - Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateOffer(Err(e))))), + Ok((conn_id, sdp)) => { + log::info!("[MediaServerWorker] rpc request {req_id}, rtpengine::RpcReq::CreateOffer => created conn {conn_id}"); + self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateOffer(Ok((conn_id, sdp)))))) + } + Err(e) => { + log::error!("[MediaServerWorker] rpc request {req_id}, rtpengine::RpcReq::CreateOffer => error {e:?}"); + self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateOffer(Err(e))))) + } } } rtpengine::RpcReq::SetAnswer(conn, answer_req) => { @@ -670,8 +694,14 @@ impl MediaServerWorker { .input(&mut self.switcher) .spawn(conn_req.app, conn_req.room, conn_req.peer, false, conn_req.session_id, Some(&conn_req.sdp)) { - Ok((conn_id, sdp)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateAnswer(Ok((conn_id, sdp)))))), - Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateAnswer(Err(e))))), + Ok((conn_id, sdp)) => { + log::info!("[MediaServerWorker] rpc request {req_id}, rtpengine::RpcReq::CreateAnswer => created conn {conn_id}"); + self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateAnswer(Ok((conn_id, sdp)))))) + } + Err(e) => { + log::error!("[MediaServerWorker] rpc request {req_id}, rtpengine::RpcReq::CreateAnswer => error {e:?}"); + self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateAnswer(Err(e))))) + } } } rtpengine::RpcReq::Delete(conn) => { diff --git a/packages/transport_rtpengine/src/transport.rs b/packages/transport_rtpengine/src/transport.rs index d7424116..fedbeaae 100644 --- a/packages/transport_rtpengine/src/transport.rs +++ b/packages/transport_rtpengine/src/transport.rs @@ -96,7 +96,11 @@ impl TransportRtpEngine { pub fn new_answer(room: RoomId, peer: PeerId, ip: IpAddr, offer: &str) -> Result<(Self, String), String> { let mut offer = SessionDescription::try_from(offer.to_string()).map_err(|e| e.to_string())?; - let dest_ip: IpAddr = offer.connection.ok_or("CONNECTION_NOT_FOUND".to_string())?.connection_address.base; + let dest_ip: IpAddr = if let Some(conn) = offer.connection { + conn.connection_address.base + } else { + offer.origin.unicast_address + }; let dest_port = offer.media_descriptions.pop().ok_or("MEDIA_NOT_FOUND".to_string())?.media.port; let remote = SocketAddr::new(dest_ip, dest_port);