diff --git a/Cargo.lock b/Cargo.lock index 4b91a56c..8edbeb62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2953,6 +2953,7 @@ dependencies = [ "atm0s-sdn", "log", "media-server-protocol", + "media-server-utils", "prost", "serde", ] diff --git a/bin/media_z0_n2.sh b/bin/media_z0_n2.sh index 4c9a0d06..3e2deb6d 100644 --- a/bin/media_z0_n2.sh +++ b/bin/media_z0_n2.sh @@ -9,4 +9,5 @@ cargo run -- \ --seeds 1@/ip4/127.0.0.1/udp/10001 \ --workers 2 \ media \ + --webrtc-port-seed 10200 \ --enable-token-api diff --git a/bin/media_z0_n3.sh b/bin/media_z0_n3.sh index 20951512..8aab69cd 100644 --- a/bin/media_z0_n3.sh +++ b/bin/media_z0_n3.sh @@ -9,4 +9,5 @@ cargo run -- \ --seeds 1@/ip4/127.0.0.1/udp/10001 \ --workers 2 \ media \ + --webrtc-port-seed 10300 \ --enable-token-api diff --git a/bin/media_z256_n1.sh b/bin/media_z256_n1.sh index 6fe41b5c..ba1d2940 100644 --- a/bin/media_z256_n1.sh +++ b/bin/media_z256_n1.sh @@ -9,4 +9,5 @@ cargo run -- \ --seeds 256@/ip4/127.0.0.1/udp/11000 \ --workers 2 \ media \ + --webrtc-port-seed 11200 \ --enable-token-api diff --git a/bin/media_z256_n2.sh b/bin/media_z256_n2.sh index 6a75a76e..a5d05764 100644 --- a/bin/media_z256_n2.sh +++ b/bin/media_z256_n2.sh @@ -9,4 +9,5 @@ cargo run -- \ --seeds 256@/ip4/127.0.0.1/udp/11000 \ --workers 2 \ media \ + --webrtc-port-seed 11300 \ --enable-token-api diff --git a/bin/src/errors.rs b/bin/src/errors.rs index 122b8798..c04fc83e 100644 --- a/bin/src/errors.rs +++ b/bin/src/errors.rs @@ -6,4 +6,5 @@ pub enum MediaServerError { NodePoolEmpty = 0x00020003, MediaResError = 0x00020004, NotImplemented = 0x00020005, + NodeTimeout = 0x00020006, } diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index 86709fc9..c24913f1 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -241,6 +241,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod max_sessions = max; } media_server_gateway::store_service::Event::FindNodeRes(req_id, res) => requester.on_find_node_res(req_id, res), + media_server_gateway::store_service::Event::FindDestRes(req_id, res) => requester.on_find_dest_res(req_id, res), }, SdnExtOut::ServicesEvent(_, _, SE::Connector(event)) => match event { media_server_connector::agent_service::Event::Stats { queue: _, inflight: _, acked: _ } => {} diff --git a/bin/src/server/gateway/dest_selector.rs b/bin/src/server/gateway/dest_selector.rs index c31208d6..f0974654 100644 --- a/bin/src/server/gateway/dest_selector.rs +++ b/bin/src/server/gateway/dest_selector.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use atm0s_sdn::NodeId; use media_server_gateway::ServiceKind; use media_server_protocol::protobuf::cluster_gateway::ping_event::gateway_origin::Location; use tokio::sync::{ @@ -7,7 +8,10 @@ use tokio::sync::{ oneshot, }; -type QueryRequest = (ServiceKind, Option<(f32, f32)>, oneshot::Sender>); +enum QueryRequest { + Select(ServiceKind, Option<(f32, f32)>, oneshot::Sender>), + DestFor(ServiceKind, NodeId, oneshot::Sender>), +} #[derive(Clone)] pub struct GatewayDestSelector { @@ -15,9 +19,19 @@ pub struct GatewayDestSelector { } impl GatewayDestSelector { - pub async fn select(&self, kind: ServiceKind, location: Option<(f32, f32)>) -> Option { + /// Select best destination, it can be media-node or other gateway node + pub async fn select(&self, kind: ServiceKind, location: Option<(f32, f32)>) -> Option { + let (tx, rx) = oneshot::channel(); + self.tx.send(QueryRequest::Select(kind, location, tx)).await.ok()?; + rx.await.ok()? + } + + /// Find forward dest if we need to send request to a node. + /// if node is in current zone, then return Some(node) if it available + /// if node in other zone, return the zone gateway node + pub async fn dest_for(&self, kind: ServiceKind, node: NodeId) -> Option { let (tx, rx) = oneshot::channel(); - self.tx.send((kind, location, tx)).await.ok()?; + self.tx.send(QueryRequest::DestFor(kind, node, tx)).await.ok()?; rx.await.ok()? } } @@ -37,16 +51,33 @@ impl GatewayDestRequester { } } + pub fn on_find_dest_res(&mut self, req_id: u64, res: Option) { + if let Some(tx) = self.reqs.remove(&req_id) { + if tx.send(res).is_err() { + log::error!("[GatewayDestRequester] answer for req_id {req_id} error"); + } + } + } + pub fn recv(&mut self) -> Option { - let (kind, location, tx) = self.rx.try_recv().ok()?; - let req_id = self.req_seed; - self.req_seed += 1; - self.reqs.insert(req_id, tx); - Some(media_server_gateway::store_service::Control::FindNodeReq( - req_id, - kind, - location.map(|(lat, lon)| Location { lat, lon }), - )) + match self.rx.try_recv().ok()? { + QueryRequest::Select(kind, location, tx) => { + let req_id = self.req_seed; + self.req_seed += 1; + self.reqs.insert(req_id, tx); + Some(media_server_gateway::store_service::Control::FindNodeReq( + req_id, + kind, + location.map(|(lat, lon)| Location { lat, lon }), + )) + } + QueryRequest::DestFor(kind, dest, tx) => { + let req_id = self.req_seed; + self.req_seed += 1; + self.reqs.insert(req_id, tx); + Some(media_server_gateway::store_service::Control::FindDestReq(req_id, kind, dest)) + } + } } } diff --git a/bin/src/server/gateway/local_rpc_handler.rs b/bin/src/server/gateway/local_rpc_handler.rs index 06d50676..aa4661c4 100644 --- a/bin/src/server/gateway/local_rpc_handler.rs +++ b/bin/src/server/gateway/local_rpc_handler.rs @@ -316,7 +316,7 @@ impl MediaLocalRpcHandler { } } else { self.feedback_route_error(session_id, now_ms() - started_at, Some(node_id), ErrorType::Timeout).await; - Err(RpcError::new2(MediaServerError::GatewayRpcError)) + Err(RpcError::new2(MediaServerError::NodeTimeout)) } } else { self.feedback_route_error(session_id, now_ms() - started_at, None, ErrorType::PoolEmpty).await; @@ -325,22 +325,16 @@ impl MediaLocalRpcHandler { } async fn webrtc_remote_ice(&self, conn_part: Option<(NodeId, u64)>, conn: ClusterConnId, param: RemoteIceRequest) -> RpcResult { - if let Some((node, _session)) = conn_part { - let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRemoteIceRequest { - conn: conn.to_string(), - candidates: param.candidates, - }; - log::info!("[Gateway] selected node {node}"); - let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); - let res = self.client.webrtc_remote_ice(sock_addr, rpc_req).await; - if let Some(res) = res { - Ok(RemoteIceResponse { added: res.added }) - } else { - Err(RpcError::new2(MediaServerError::GatewayRpcError)) - } - } else { - Err(RpcError::new2(MediaServerError::InvalidConnId)) - } + let (node, _session) = conn_part.ok_or(RpcError::new2(MediaServerError::InvalidConnId))?; + let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRemoteIceRequest { + conn: conn.to_string(), + candidates: param.candidates, + }; + log::info!("[Gateway] selected node {node}"); + let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); + let res = self.client.webrtc_remote_ice(sock_addr, rpc_req).await; + let res = res.ok_or(RpcError::new2(MediaServerError::GatewayRpcError))?; + Ok(RemoteIceResponse { added: res.added }) } #[allow(clippy::too_many_arguments)] @@ -354,31 +348,34 @@ impl MediaLocalRpcHandler { extra_data: Option, record: bool, ) -> RpcResult<(ClusterConnId, ConnectResponse)> { - //TODO how to handle media-node down? - if let Some((node, _session)) = conn_part { - let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRestartIceRequest { - conn: conn.to_string(), - ip: ip.to_string(), - user_agent, - req: Some(req), - record, - extra_data, - }; - log::info!("[Gateway] selected node {node}"); - let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); - let res = self.client.webrtc_restart_ice(sock_addr, rpc_req).await; - if let Some(res) = res { - if let Some(res) = res.res { - Ok((res.conn_id.parse().unwrap(), res)) - } else { - Err(RpcError::new2(MediaServerError::MediaResError)) + let (node, _session) = conn_part.ok_or(RpcError::new2(MediaServerError::InvalidConnId))?; + let dest = match self.selector.dest_for(ServiceKind::Webrtc, node).await { + Some(dest) => dest, + None => match self.selector.select(ServiceKind::Webrtc, self.ip2location.get_location(&ip)).await { + Some(dest) => { + log::warn!("[Gateway] not found dest {node} found other node {dest} for restart-ice (reconnect to other server)"); + dest } - } else { - Err(RpcError::new2(MediaServerError::GatewayRpcError)) - } - } else { - Err(RpcError::new2(MediaServerError::InvalidConnId)) - } + None => { + log::warn!("[Gateway] node pool empty for restart-ice to dest {node}"); + return RpcResult::Err(RpcError::new2(MediaServerError::NodePoolEmpty)); + } + }, + }; + log::info!("[Gateway] selected dest node {dest} with provided node {node}"); + let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRestartIceRequest { + conn: conn.to_string(), + ip: ip.to_string(), + user_agent, + req: Some(req), + record, + extra_data, + }; + let sock_addr = node_vnet_addr(dest, GATEWAY_RPC_PORT); + let res = self.client.webrtc_restart_ice(sock_addr, rpc_req).await; + let res = res.ok_or(RpcError::new2(MediaServerError::GatewayRpcError))?; + let res = res.res.ok_or(RpcError::new2(MediaServerError::MediaResError))?; + Ok((res.conn_id.parse().unwrap(), res)) } /* diff --git a/packages/media_gateway/Cargo.toml b/packages/media_gateway/Cargo.toml index 834131d7..3e998e67 100644 --- a/packages/media_gateway/Cargo.toml +++ b/packages/media_gateway/Cargo.toml @@ -9,5 +9,6 @@ edition = "2021" log = { workspace = true } serde = { version = "1.0", features = ["derive"] } media-server-protocol = { path = "../protocol" } +media-server-utils = { path = "../media_utils" } atm0s-sdn = { workspace = true } prost = { workspace = true } diff --git a/packages/media_gateway/src/store.rs b/packages/media_gateway/src/store.rs index b2f7a2fc..b82a8678 100644 --- a/packages/media_gateway/src/store.rs +++ b/packages/media_gateway/src/store.rs @@ -1,3 +1,4 @@ +use atm0s_sdn::NodeId; use media_server_protocol::protobuf::cluster_gateway::ping_event::{gateway_origin::Location, GatewayOrigin, Origin, ServiceStats}; use crate::{NodeMetrics, ServiceKind}; @@ -32,8 +33,8 @@ impl GatewayStore { pub fn new(zone: u32, location: Location, max_cpu: u8, max_memory: u8, max_disk: u8) -> Self { Self { node: NodeMetrics::default(), - webrtc: ServiceStore::new(ServiceKind::Webrtc, location), - rtpengine: ServiceStore::new(ServiceKind::RtpEngine, location), + webrtc: ServiceStore::new(zone, ServiceKind::Webrtc, location), + rtpengine: ServiceStore::new(zone, ServiceKind::RtpEngine, location), zone, location, output: None, @@ -101,7 +102,7 @@ impl GatewayStore { } } - pub fn best_for(&self, kind: ServiceKind, location: Option) -> Option { + pub fn best_for(&self, kind: ServiceKind, location: Option) -> Option { let node = match kind { ServiceKind::Webrtc => self.webrtc.best_for(location), ServiceKind::RtpEngine => self.rtpengine.best_for(location), @@ -110,6 +111,15 @@ impl GatewayStore { node } + pub fn dest_for(&self, kind: ServiceKind, dest: NodeId) -> Option { + let node = match kind { + ServiceKind::Webrtc => self.webrtc.dest_for(dest), + ServiceKind::RtpEngine => self.rtpengine.dest_for(dest), + }; + log::debug!("[GatewayStore] query dest {:?} for node {} got {:?}", kind, dest, node); + node + } + pub fn local_stats(&self) -> Option { self.webrtc.local_stats() } diff --git a/packages/media_gateway/src/store/service.rs b/packages/media_gateway/src/store/service.rs index 65eef727..04749cad 100644 --- a/packages/media_gateway/src/store/service.rs +++ b/packages/media_gateway/src/store/service.rs @@ -1,4 +1,6 @@ +use atm0s_sdn::NodeId; use media_server_protocol::protobuf::cluster_gateway::ping_event::{gateway_origin::Location, ServiceStats}; +use media_server_utils::node_zone_id; use crate::ServiceKind; @@ -22,6 +24,7 @@ struct ZoneSource { } pub struct ServiceStore { + zone: u32, kind: ServiceKind, location: Location, local_sources: Vec, @@ -29,9 +32,10 @@ pub struct ServiceStore { } impl ServiceStore { - pub fn new(kind: ServiceKind, location: Location) -> Self { + pub fn new(zone: u32, kind: ServiceKind, location: Location) -> Self { log::info!("[ServiceStore {:?}] create new in {:?}", kind, location); Self { + zone, kind, location, local_sources: vec![], @@ -153,6 +157,28 @@ impl ServiceStore { min_node } + /// If we in same zone then only check local registry + /// Else we forward it to the zone gateway if available + pub fn dest_for(&self, dest: NodeId) -> Option { + if node_zone_id(dest) == self.zone { + for n in self.local_sources.iter() { + if n.node == dest { + return Some(dest); + } + } + + None + } else { + for z in self.zone_sources.iter() { + if z.zone == node_zone_id(dest) { + return z.gateways.first().map(|s| s.node); + } + } + + None + } + } + pub fn local_stats(&self) -> Option { if self.local_sources.is_empty() { return None; @@ -225,7 +251,7 @@ mod tests { #[test] fn empty_store() { - let store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + let store = ServiceStore::new(0, ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); assert_eq!(store.best_for(None), None); assert_eq!(store.best_for(Some(Location { lat: 1.0, lon: 1.0 })), None); @@ -234,7 +260,7 @@ mod tests { #[test] fn local_store() { - let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + let mut store = ServiceStore::new(0, ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); store.on_node_ping(0, 1, 60, ServiceStats { live: 100, max: 1000, active: true }); store.on_node_ping(0, 2, 50, ServiceStats { live: 60, max: 1000, active: true }); @@ -259,7 +285,7 @@ mod tests { #[test] fn remote_zones_store() { - let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + let mut store = ServiceStore::new(0, ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); store.on_gateway_ping(0, 256, 256, 60, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true }); store.on_gateway_ping(0, 256, 257, 50, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true }); @@ -283,7 +309,7 @@ mod tests { #[test] fn local_and_remote_zones() { - let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + let mut store = ServiceStore::new(0, ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); store.on_node_ping(0, 1, 60, ServiceStats { live: 100, max: 1000, active: true }); store.on_gateway_ping(0, 256, 257, 60, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true }); @@ -309,7 +335,7 @@ mod tests { #[test] fn clear_timeout() { - let mut store = ServiceStore::new(ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + let mut store = ServiceStore::new(0, ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); store.on_node_ping(0, 1, 60, ServiceStats { live: 100, max: 1000, active: true }); store.on_gateway_ping(0, 256, 257, 60, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true }); @@ -322,4 +348,24 @@ mod tests { assert_eq!(store.local_sources.len(), 0); assert_eq!(store.zone_sources.len(), 0); } + + #[test] + fn dest_for_same_zone() { + let mut store = ServiceStore::new(0, ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + store.on_node_ping(0, 1, 60, ServiceStats { live: 100, max: 1000, active: true }); + + assert_eq!(store.dest_for(1), Some(1)); + assert_eq!(store.dest_for(2), None); + } + + #[test] + fn dest_for_other_zone() { + let mut store = ServiceStore::new(0, ServiceKind::Webrtc, Location { lat: 1.0, lon: 1.0 }); + + store.on_node_ping(0, 1, 60, ServiceStats { live: 100, max: 1000, active: true }); + store.on_gateway_ping(0, 256, 257, 60, Location { lat: 2.0, lon: 2.0 }, 50, ServiceStats { live: 100, max: 1000, active: true }); + + assert_eq!(store.dest_for(260), Some(257)); + assert_eq!(store.dest_for(1024), None); + } } diff --git a/packages/media_gateway/src/store_service.rs b/packages/media_gateway/src/store_service.rs index 91b21838..707f3b45 100644 --- a/packages/media_gateway/src/store_service.rs +++ b/packages/media_gateway/src/store_service.rs @@ -6,7 +6,7 @@ use atm0s_sdn::{ ServiceWorkerOutput, }, features::{data, FeaturesControl, FeaturesEvent}, - RouteRule, ServiceBroadcastLevel, + NodeId, RouteRule, ServiceBroadcastLevel, }; use media_server_protocol::protobuf::{ self, @@ -23,6 +23,7 @@ use crate::{ pub enum Control { NodeStats(NodeMetrics), FindNodeReq(u64, ServiceKind, Option), + FindDestReq(u64, ServiceKind, NodeId), GetMediaStats, } @@ -30,6 +31,7 @@ pub enum Control { pub enum Event { MediaStats(u32, u32), FindNodeRes(u64, Option), + FindDestRes(u64, Option), } pub struct GatewayStoreService { @@ -132,6 +134,10 @@ where let out = self.store.best_for(kind, location); self.queue.push_back(ServiceOutput::Event(actor, Event::FindNodeRes(req_id, out).into())); } + Control::FindDestReq(req_id, kind, dest) => { + let out = self.store.dest_for(kind, dest); + self.queue.push_back(ServiceOutput::Event(actor, Event::FindDestRes(req_id, out).into())); + } Control::NodeStats(metrics) => { log::debug!("[GatewayStoreService] node metrics {:?}", metrics); self.store.on_node_metrics(now, metrics); diff --git a/packages/media_utils/src/lib.rs b/packages/media_utils/src/lib.rs index 5e64fbe7..0e0b821d 100644 --- a/packages/media_utils/src/lib.rs +++ b/packages/media_utils/src/lib.rs @@ -1,4 +1,5 @@ mod f16; +mod sdn; mod select; mod seq_extend; mod seq_rewrite; @@ -8,6 +9,7 @@ mod ts_rewrite; mod uri; pub use f16::{F16i, F16u}; +pub use sdn::*; pub use select::*; pub use seq_extend::RtpSeqExtend; pub use seq_rewrite::SeqRewrite; diff --git a/packages/media_utils/src/sdn.rs b/packages/media_utils/src/sdn.rs new file mode 100644 index 00000000..890a66b2 --- /dev/null +++ b/packages/media_utils/src/sdn.rs @@ -0,0 +1,3 @@ +pub fn node_zone_id(node: u32) -> u32 { + node & 0xFFFFFF00 +}