Skip to content

Commit

Permalink
fix: rtpengine create_answer error with sdp without connection line
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Oct 11, 2024
1 parent 44d06f1 commit 50184c3
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 22 deletions.
5 changes: 4 additions & 1 deletion bin/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub async fn run_console_http_server(
storage: crate::server::console_storage::StorageShared,
connector: MediaConnectorServiceClient<SocketAddr, QuinnClient, QuinnStream>,
) -> Result<(), Box<dyn std::error::Error>> {
use poem::middleware::Tracing;

let user_service: OpenApiService<_, ()> = OpenApiService::new(api_console::user::Apis, "Console User APIs", env!("CARGO_PKG_VERSION")).server("/api/user/");
let user_ui = user_service.swagger_ui();
let user_spec = user_service.spec();
Expand Down Expand Up @@ -101,7 +103,8 @@ pub async fn run_console_http_server(
.nest("/api/connector/", connector_service.data(ctx.clone()))
.nest("/api/connector/ui", connector_ui)
.at("/api/connector/spec", poem::endpoint::make_sync(move |_| connector_spec.clone()))
.with(Cors::new());
.with(Cors::new())
.with(Tracing::default());

Server::new(TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), port))).run(route).await?;
Ok(())
Expand Down
70 changes: 50 additions & 20 deletions packages/media_runner/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,14 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
transport_webrtc::VariantParams::Whip(req.room, req.peer, req.extra_data, req.record),
&req.sdp,
) {
Ok((_ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Ok(WhipConnectRes { conn_id, sdp }))))),
Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Err(e))))),
Ok((_ice_lite, sdp, conn_id)) => {
log::info!("[MediaServerWorker] rpc request {req_id}, whip::RpcReq::Connect => created conn {conn_id}");
self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Ok(WhipConnectRes { conn_id, sdp })))))
}
Err(e) => {
log::error!("[MediaServerWorker] rpc request {req_id}, whip::RpcReq::Connect => error {e:?}");
self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whip(whip::RpcRes::Connect(Err(e)))))
}
}
}
whip::RpcReq::RemoteIce(req) => {
Expand Down Expand Up @@ -579,8 +585,14 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
transport_webrtc::VariantParams::Whep(req.room, peer_id.into(), req.extra_data),
&req.sdp,
) {
Ok((_ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Ok(WhepConnectRes { conn_id, sdp }))))),
Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Err(e))))),
Ok((_ice_lite, sdp, conn_id)) => {
log::info!("[MediaServerWorker] rpc request {req_id}, whep::RpcReq::Connect => created conn {conn_id}");
self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Ok(WhepConnectRes { conn_id, sdp })))))
}
Err(e) => {
log::info!("[MediaServerWorker] rpc request {req_id}, whep::RpcReq::Connect => error {e:?}");
self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Whep(whep::RpcRes::Connect(Err(e)))))
}
}
}
whep::RpcReq::RemoteIce(req) => {
Expand All @@ -606,18 +618,24 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
.input(&mut self.switcher)
.spawn(app, ip, session_id, VariantParams::Webrtc(user_agent, req.clone(), extra_data, record, self.secure.clone()), &req.sdp)
{
Ok((ice_lite, sdp, conn_id)) => self.queue.push_back(Output::ExtRpc(
req_id,
RpcRes::Webrtc(webrtc::RpcRes::Connect(Ok((
conn_id,
ConnectResponse {
conn_id: "".to_string(),
sdp,
ice_lite,
},
)))),
)),
Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Webrtc(webrtc::RpcRes::Connect(Err(e))))),
Ok((ice_lite, sdp, conn_id)) => {
log::info!("[MediaServerWorker] rpc request {req_id}, webrtc::RpcReq::Connect => created conn {conn_id}");
self.queue.push_back(Output::ExtRpc(
req_id,
RpcRes::Webrtc(webrtc::RpcRes::Connect(Ok((
conn_id,
ConnectResponse {
conn_id: "".to_string(),
sdp,
ice_lite,
},
)))),
))
}
Err(e) => {
log::error!("[MediaServerWorker] rpc request {req_id}, webrtc::RpcReq::Connect => error {e:?}");
self.queue.push_back(Output::ExtRpc(req_id, RpcRes::Webrtc(webrtc::RpcRes::Connect(Err(e)))))
}
}
}
webrtc::RpcReq::RemoteIce(conn, ice) => {
Expand Down Expand Up @@ -653,8 +671,14 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
.input(&mut self.switcher)
.spawn(conn_req.app, conn_req.room, conn_req.peer, false, conn_req.session_id, None)
{
Ok((conn_id, sdp)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateOffer(Ok((conn_id, sdp)))))),
Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateOffer(Err(e))))),
Ok((conn_id, sdp)) => {
log::info!("[MediaServerWorker] rpc request {req_id}, rtpengine::RpcReq::CreateOffer => created conn {conn_id}");
self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateOffer(Ok((conn_id, sdp))))))
}
Err(e) => {
log::error!("[MediaServerWorker] rpc request {req_id}, rtpengine::RpcReq::CreateOffer => error {e:?}");
self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateOffer(Err(e)))))
}
}
}
rtpengine::RpcReq::SetAnswer(conn, answer_req) => {
Expand All @@ -670,8 +694,14 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
.input(&mut self.switcher)
.spawn(conn_req.app, conn_req.room, conn_req.peer, false, conn_req.session_id, Some(&conn_req.sdp))
{
Ok((conn_id, sdp)) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateAnswer(Ok((conn_id, sdp)))))),
Err(e) => self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateAnswer(Err(e))))),
Ok((conn_id, sdp)) => {
log::info!("[MediaServerWorker] rpc request {req_id}, rtpengine::RpcReq::CreateAnswer => created conn {conn_id}");
self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateAnswer(Ok((conn_id, sdp))))))
}
Err(e) => {
log::error!("[MediaServerWorker] rpc request {req_id}, rtpengine::RpcReq::CreateAnswer => error {e:?}");
self.queue.push_back(Output::ExtRpc(req_id, RpcRes::RtpEngine(rtpengine::RpcRes::CreateAnswer(Err(e)))))
}
}
}
rtpengine::RpcReq::Delete(conn) => {
Expand Down
6 changes: 5 additions & 1 deletion packages/transport_rtpengine/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ impl TransportRtpEngine {

pub fn new_answer(room: RoomId, peer: PeerId, ip: IpAddr, offer: &str) -> Result<(Self, String), String> {
let mut offer = SessionDescription::try_from(offer.to_string()).map_err(|e| e.to_string())?;
let dest_ip: IpAddr = offer.connection.ok_or("CONNECTION_NOT_FOUND".to_string())?.connection_address.base;
let dest_ip: IpAddr = if let Some(conn) = offer.connection {
conn.connection_address.base
} else {
offer.origin.unicast_address
};
let dest_port = offer.media_descriptions.pop().ok_or("MEDIA_NOT_FOUND".to_string())?.media.port;
let remote = SocketAddr::new(dest_ip, dest_port);

Expand Down

0 comments on commit 50184c3

Please sign in to comment.