diff --git a/Cargo.lock b/Cargo.lock index b738028a..910a078e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,6 +168,7 @@ dependencies = [ "rustls", "sans-io-runtime", "serde", + "sysinfo", "tokio", "tracing-subscriber", ] @@ -2004,6 +2005,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2807,6 +2817,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "rcgen" version = "0.13.1" @@ -3448,6 +3478,21 @@ dependencies = [ "futures-core", ] +[[package]] +name = "sysinfo" +version = "0.30.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "732ffa00f53e6b2af46208fba5718d9662a421049204e156328b66791ffa15ae" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "windows", +] + [[package]] name = "tempfile" version = "3.10.1" @@ -4015,6 +4060,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.5", +] + [[package]] name = "windows-core" version = "0.52.0" diff --git a/README.md b/README.md index 8db4e8ef..2f453606 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ We are actively refactoring entire media server and network stack with [sans-io- | MoQ | Media-over-Quic | ❌ | | Monitoring | Dashboard for monitoring | ❌ | | Recording | Record stream | ❌ | -| Gateway | External gateway [RFC-0003](https://github.com/8xFF/rfcs/pull/3) | 🚧 | +| Gateway | External gateway [RFC-0003](https://github.com/8xFF/rfcs/pull/3) | 🚀 | | Connector | External event handling | ❌ | Status: diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 9efc801c..0a32569a 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -28,12 +28,14 @@ convert-enum = { workspace = true } num_enum = { workspace = true } derive_more = { workspace = true } rcgen = { version = "0.13", optional = true } -maxminddb = "0.24.0" +maxminddb = { version = "0.24.0", optional = true } +sysinfo = { version = "0.30.12", optional = true } [features] default = ["gateway", "media", "connector", "cert_utils"] -gateway = ["media-server-gateway", "quinn_vnet"] -media = ["media-server-runner", "quinn_vnet"] +gateway = ["media-server-gateway", "quinn_vnet", "node_metrics", "maxminddb"] +media = ["media-server-runner", "quinn_vnet", "node_metrics"] connector = ["quinn_vnet"] cert_utils = ["rcgen", "rustls"] quinn_vnet = ["rustls", "quinn"] +node_metrics = ["sysinfo"] diff --git a/bin/src/errors.rs b/bin/src/errors.rs index 38eacb73..122b8798 100644 --- a/bin/src/errors.rs +++ b/bin/src/errors.rs @@ -3,4 +3,7 @@ pub enum MediaServerError { GatewayRpcError = 0x00020001, InvalidConnId = 0x00020002, + NodePoolEmpty = 0x00020003, + MediaResError = 0x00020004, + NotImplemented = 0x00020005, } diff --git a/bin/src/lib.rs b/bin/src/lib.rs index 7f05225f..fbd934ae 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -4,6 +4,8 @@ use atm0s_sdn::{NodeAddr, NodeId}; mod errors; mod http; +#[cfg(feature = "node_metrics")] +mod node_metrics; #[cfg(feature = "quinn_vnet")] mod quinn; pub mod server; diff --git a/bin/src/node_metrics.rs b/bin/src/node_metrics.rs new file mode 100644 index 00000000..da568b3a --- /dev/null +++ b/bin/src/node_metrics.rs @@ -0,0 +1,67 @@ +use std::{ + sync::mpsc::{channel, Receiver}, + time::Duration, +}; + +use media_server_gateway::NodeMetrics; +use sans_io_runtime::ErrorDebugger2; +use sysinfo::{Disks, System}; + +const REFRESH_INTERVAL_SECONDS: u64 = 2; + +pub struct NodeMetricsCollector { + rx: Receiver, +} + +impl Default for NodeMetricsCollector { + fn default() -> Self { + let (tx, rx) = channel(); + let mut sys = System::new_all(); + let mut disks = Disks::new(); + + disks.refresh_list(); + sys.refresh_all(); + sys.refresh_cpu(); + + std::thread::spawn(move || { + loop { + disks.refresh(); + sys.refresh_all(); + sys.refresh_cpu(); + + let mut sum = 0.0; + for cpu in sys.cpus() { + sum += cpu.cpu_usage(); + } + + let mut disk_used = 0; + let mut disk_sum = 0; + for disk in disks.iter() { + disk_sum += disk.total_space(); + disk_used += disk.total_space() - disk.available_space(); + } + + tx.send(NodeMetrics { + cpu: (sum as usize / sys.cpus().len()) as u8, + memory: (100 * sys.used_memory() / sys.total_memory()) as u8, + disk: (100 * disk_used / disk_sum) as u8, + }) + .print_err2("Collect node metrics error"); + + // Sleeping to let time for the system to run for long + // enough to have useful information. + std::thread::sleep(Duration::from_secs(REFRESH_INTERVAL_SECONDS)); + } + }); + + Self { rx } + } +} + +impl NodeMetricsCollector { + /// Only return data in each interval, if not return None. + /// Node that this method must node blocking thread + pub fn pop_measure(&mut self) -> Option { + self.rx.try_recv().ok() + } +} diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index 4b9bfa6c..a0b2b84c 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -2,35 +2,29 @@ use std::{sync::Arc, time::Duration}; use atm0s_sdn::{features::FeaturesEvent, secure::StaticKeyAuthorization, services::visualization, SdnBuilder, SdnControllerUtils, SdnExtOut, SdnOwner}; use clap::Parser; -use media_server_gateway::{store_service::GatewayStoreServiceBuilder, ServiceKind, STORE_SERVICE_ID}; +use media_server_gateway::{store_service::GatewayStoreServiceBuilder, STORE_SERVICE_ID}; use media_server_protocol::{ gateway::{generate_gateway_zone_tag, GATEWAY_RPC_PORT}, - protobuf::{ - cluster_gateway::{MediaEdgeServiceClient, MediaEdgeServiceServer}, - gateway::RemoteIceResponse, - }, - rpc::{ - node_vnet_addr, - quinn::{QuinnClient, QuinnServer}, - }, - transport::{webrtc, whep, whip, RpcError, RpcReq, RpcRes}, + protobuf::cluster_gateway::{MediaEdgeServiceClient, MediaEdgeServiceServer}, + rpc::quinn::{QuinnClient, QuinnServer}, }; use media_server_secure::jwt::{MediaEdgeSecureJwt, MediaGatewaySecureJwt}; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use crate::{ - errors::MediaServerError, http::run_gateway_http_server, + node_metrics::NodeMetricsCollector, quinn::{make_quinn_client, make_quinn_server, VirtualNetwork}, NodeConfig, }; use sans_io_runtime::{backend::PollingBackend, ErrorDebugger2}; -use self::{dest_selector::build_dest_selector, ip_location::Ip2Location}; +use self::{dest_selector::build_dest_selector, ip_location::Ip2Location, local_rpc_handler::MediaLocalRpcHandler}; mod dest_selector; mod ip_location; -mod rpc_handler; +mod local_rpc_handler; +mod remote_rpc_handler; #[derive(Clone, Debug, convert_enum::From, convert_enum::TryInto)] enum SC { @@ -59,6 +53,18 @@ pub struct Args { /// GeoIp database #[arg(env, long, default_value = "./maxminddb-data/GeoLite2-City.mmdb")] geo_db: String, + + /// Max cpu usage (in percent) of media-node or gateway-node we allow to route to + #[arg(env, long, default_value_t = 60)] + max_cpu: u8, + + /// Max memory usage (in percent) of media-node or gateway-node we allow to route to + #[arg(env, long, default_value_t = 80)] + max_memory: u8, + + /// Max disk usage (in percent) of media-node or gateway-node we allow to route to + #[arg(env, long, default_value_t = 90)] + max_disk: u8, } pub async fn run_media_gateway(workers: usize, http_port: Option, node: NodeConfig, args: Args) { @@ -86,7 +92,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod builder.set_authorization(StaticKeyAuthorization::new(&node.secret)); builder.set_manual_discovery(vec!["gateway".to_string(), generate_gateway_zone_tag(node.zone)], vec!["gateway".to_string()]); - builder.add_service(Arc::new(GatewayStoreServiceBuilder::new(node.zone, args.lat, args.lon))); + builder.add_service(Arc::new(GatewayStoreServiceBuilder::new(node.zone, args.lat, args.lon, args.max_cpu, args.max_memory, args.max_disk))); for seed in node.seeds { builder.add_seed(seed); @@ -111,24 +117,35 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let media_rpc_socket = vnet.udp_socket(GATEWAY_RPC_PORT).await.expect("Should open virtual port for gateway rpc"); let mut media_rpc_server = MediaEdgeServiceServer::new( QuinnServer::new(make_quinn_server(media_rpc_socket, default_cluster_key, default_cluster_cert.clone()).expect("Should create endpoint for media rpc server")), - rpc_handler::Ctx { + remote_rpc_handler::Ctx { selector: selector.clone(), client: media_rpc_client.clone(), ip2location: ip2location.clone(), }, - rpc_handler::MediaRpcHandlerImpl::default(), + remote_rpc_handler::MediaRemoteRpcHandlerImpl::default(), ); + let local_rpc_processor = Arc::new(MediaLocalRpcHandler::new(selector, media_rpc_client, ip2location)); + tokio::task::spawn_local(async move { media_rpc_server.run().await; }); tokio::task::spawn_local(async move { while vnet.recv().await.is_some() {} }); + // Collect node metrics for update to gateway agent service, this information is used inside gateway + // for forwarding from other gateway + let mut node_metrics_collector = NodeMetricsCollector::default(); + loop { if controller.process().is_none() { break; } + + // Pop from metric collector and pass to Gateway store service + if let Some(metrics) = node_metrics_collector.pop_measure() { + controller.service_control(STORE_SERVICE_ID.into(), (), media_server_gateway::store_service::Control::NodeStats(metrics).into()); + } while let Ok(control) = vnet_rx.try_recv() { controller.feature_control((), control.into()); } @@ -139,224 +156,11 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let res_tx = req.answer_tx; let param = req.req; let conn_part = param.get_conn_part(); - let selector = selector.clone(); - let client = media_rpc_client.clone(); - let ip2location = ip2location.clone(); + let local_rpc_processor = local_rpc_processor.clone(); + tokio::spawn(async move { - match param { - RpcReq::Whip(param) => match param { - whip::RpcReq::Connect(param) => { - if let Some(selected) = selector.select(ServiceKind::Webrtc, ip2location.get_location(¶m.ip)).await { - let sock_addr = node_vnet_addr(selected, GATEWAY_RPC_PORT); - log::info!("[Gateway] selected node {selected}"); - let rpc_req = param.into(); - let res = client.whip_connect(sock_addr, rpc_req).await; - log::info!("[Gateway] response from node {selected} => {:?}", res); - if let Some(res) = res { - res_tx - .send(RpcRes::Whip(whip::RpcRes::Connect(Ok(whip::WhipConnectRes { - sdp: res.sdp, - conn_id: res.conn.parse().unwrap(), - })))) - .print_err2("answer http request error"); - } else { - res_tx - .send(RpcRes::Whip(whip::RpcRes::Connect(Err(RpcError::new2(MediaServerError::GatewayRpcError))))) - .print_err2("answer http request error"); - } - } - } - whip::RpcReq::RemoteIce(req) => { - if let Some((node, _session)) = conn_part { - let rpc_req = media_server_protocol::protobuf::cluster_gateway::WhipRemoteIceRequest { - conn: req.conn_id.to_string(), - ice: req.ice, - }; - log::info!("[Gateway] selected node {node}"); - let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); - let res = client.whip_remote_ice(sock_addr, rpc_req).await; - if let Some(_res) = res { - res_tx - .send(RpcRes::Whip(whip::RpcRes::RemoteIce(Ok(whip::WhipRemoteIceRes {})))) - .print_err2("answer http request error"); - } else { - res_tx - .send(RpcRes::Whip(whip::RpcRes::RemoteIce(Err(RpcError::new2(MediaServerError::GatewayRpcError))))) - .print_err2("answer http request error"); - } - } else { - res_tx - .send(RpcRes::Whip(whip::RpcRes::RemoteIce(Err(RpcError::new2(MediaServerError::InvalidConnId))))) - .print_err2("answer http request error"); - } - } - whip::RpcReq::Delete(req) => { - if let Some((node, _session)) = conn_part { - let rpc_req = media_server_protocol::protobuf::cluster_gateway::WhipCloseRequest { conn: req.conn_id.to_string() }; - log::info!("[Gateway] selected node {node}"); - let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); - let res = client.whip_close(sock_addr, rpc_req).await; - if let Some(_res) = res { - res_tx.send(RpcRes::Whip(whip::RpcRes::Delete(Ok(whip::WhipDeleteRes {})))).print_err2("answer http request error"); - } else { - res_tx - .send(RpcRes::Whip(whip::RpcRes::Delete(Err(RpcError::new2(MediaServerError::GatewayRpcError))))) - .print_err2("answer http request error"); - } - } else { - res_tx - .send(RpcRes::Whip(whip::RpcRes::Delete(Err(RpcError::new2(MediaServerError::InvalidConnId))))) - .print_err2("answer http request error"); - } - } - }, - RpcReq::Whep(param) => match param { - whep::RpcReq::Connect(param) => { - if let Some(selected) = selector.select(ServiceKind::Webrtc, ip2location.get_location(¶m.ip)).await { - let sock_addr = node_vnet_addr(selected, GATEWAY_RPC_PORT); - log::info!("[Gateway] selected node {selected}"); - let rpc_req = param.into(); - let res = client.whep_connect(sock_addr, rpc_req).await; - log::info!("[Gateway] response from node {selected} => {:?}", res); - if let Some(res) = res { - res_tx - .send(RpcRes::Whep(whep::RpcRes::Connect(Ok(whep::WhepConnectRes { - sdp: res.sdp, - conn_id: res.conn.parse().unwrap(), - })))) - .print_err2("answer http request error"); - } else { - res_tx - .send(RpcRes::Whep(whep::RpcRes::Connect(Err(RpcError::new2(MediaServerError::GatewayRpcError))))) - .print_err2("answer http request error"); - } - } - } - whep::RpcReq::RemoteIce(req) => { - if let Some((node, _session)) = conn_part { - let rpc_req = media_server_protocol::protobuf::cluster_gateway::WhepRemoteIceRequest { - conn: req.conn_id.to_string(), - ice: req.ice, - }; - log::info!("[Gateway] selected node {node}"); - let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); - let res = client.whep_remote_ice(sock_addr, rpc_req).await; - if let Some(_res) = res { - res_tx - .send(RpcRes::Whep(whep::RpcRes::RemoteIce(Ok(whep::WhepRemoteIceRes {})))) - .print_err2("answer http request error"); - } else { - res_tx - .send(RpcRes::Whep(whep::RpcRes::RemoteIce(Err(RpcError::new2(MediaServerError::GatewayRpcError))))) - .print_err2("answer http request error"); - } - } else { - res_tx - .send(RpcRes::Whep(whep::RpcRes::RemoteIce(Err(RpcError::new2(MediaServerError::InvalidConnId))))) - .print_err2("answer http request error"); - } - } - whep::RpcReq::Delete(req) => { - if let Some((node, _session)) = conn_part { - let rpc_req = media_server_protocol::protobuf::cluster_gateway::WhepCloseRequest { conn: req.conn_id.to_string() }; - log::info!("[Gateway] selected node {node}"); - let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); - let res = client.whep_close(sock_addr, rpc_req).await; - if let Some(_res) = res { - res_tx.send(RpcRes::Whep(whep::RpcRes::Delete(Ok(whep::WhepDeleteRes {})))).print_err2("answer http request error"); - } else { - res_tx - .send(RpcRes::Whep(whep::RpcRes::Delete(Err(RpcError::new2(MediaServerError::GatewayRpcError))))) - .print_err2("answer http request error"); - } - } else { - res_tx - .send(RpcRes::Whep(whep::RpcRes::Delete(Err(RpcError::new2(MediaServerError::InvalidConnId))))) - .print_err2("answer http request error"); - } - } - }, - RpcReq::Webrtc(param) => match param { - webrtc::RpcReq::Connect(ip, user_agent, req) => { - if let Some(selected) = selector.select(ServiceKind::Webrtc, ip2location.get_location(&ip)).await { - let sock_addr = node_vnet_addr(selected, GATEWAY_RPC_PORT); - log::info!("[Gateway] selected node {selected}"); - let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcConnectRequest { - user_agent, - ip: ip.to_string(), - req: Some(req), - }; - let res = client.webrtc_connect(sock_addr, rpc_req).await; - log::info!("[Gateway] response from node {selected} => {:?}", res); - if let Some(res) = res { - let res = res.res.unwrap(); - res_tx - .send(RpcRes::Webrtc(webrtc::RpcRes::Connect(Ok((res.conn_id.parse().unwrap(), res))))) - .print_err2("answer http request error"); - } else { - res_tx - .send(RpcRes::Webrtc(webrtc::RpcRes::Connect(Err(RpcError::new2(MediaServerError::GatewayRpcError))))) - .print_err2("answer http request error"); - } - } - } - webrtc::RpcReq::RemoteIce(conn, ice) => { - if let Some((node, _session)) = conn_part { - let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRemoteIceRequest { - conn: conn.to_string(), - candidates: ice.candidates, - }; - log::info!("[Gateway] selected node {node}"); - let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); - let res = client.webrtc_remote_ice(sock_addr, rpc_req).await; - if let Some(res) = res { - res_tx - .send(RpcRes::Webrtc(webrtc::RpcRes::RemoteIce(Ok(RemoteIceResponse { added: res.added })))) - .print_err2("answer http request error"); - } else { - res_tx - .send(RpcRes::Webrtc(webrtc::RpcRes::RemoteIce(Err(RpcError::new2(MediaServerError::GatewayRpcError))))) - .print_err2("answer http request error"); - } - } else { - res_tx - .send(RpcRes::Webrtc(webrtc::RpcRes::RemoteIce(Err(RpcError::new2(MediaServerError::InvalidConnId))))) - .print_err2("answer http request error"); - } - } - webrtc::RpcReq::RestartIce(conn, ip, user_agent, req) => { - //TODO how to handle media-node down? - if let Some((node, _session)) = conn_part { - let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRestartIceRequest { - conn: conn.to_string(), - ip: ip.to_string(), - user_agent, - req: Some(req), - }; - log::info!("[Gateway] selected node {node}"); - let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); - let res = client.webrtc_restart_ice(sock_addr, rpc_req).await; - if let Some(res) = res { - let res = res.res.unwrap(); - res_tx - .send(RpcRes::Webrtc(webrtc::RpcRes::RestartIce(Ok((res.conn_id.parse().unwrap(), res))))) - .print_err2("answer http request error"); - } else { - res_tx - .send(RpcRes::Webrtc(webrtc::RpcRes::RestartIce(Err(RpcError::new2(MediaServerError::GatewayRpcError))))) - .print_err2("answer http request error"); - } - } else { - res_tx - .send(RpcRes::Webrtc(webrtc::RpcRes::RestartIce(Err(RpcError::new2(MediaServerError::InvalidConnId))))) - .print_err2("answer http request error"); - } - } - webrtc::RpcReq::Delete(_) => { - //TODO implement delete webrtc conn - } - }, - } + let res = local_rpc_processor.process_req(conn_part, param).await; + res_tx.send(res).print_err2("answer http request error"); }); } diff --git a/bin/src/server/gateway/dest_selector.rs b/bin/src/server/gateway/dest_selector.rs index 6751b37b..5bbd166d 100644 --- a/bin/src/server/gateway/dest_selector.rs +++ b/bin/src/server/gateway/dest_selector.rs @@ -65,3 +65,5 @@ pub fn build_dest_selector() -> (GatewayDestSelector, GatewayDestRequester) { }, ) } + +//TODO test diff --git a/bin/src/server/gateway/local_rpc_handler.rs b/bin/src/server/gateway/local_rpc_handler.rs new file mode 100644 index 00000000..ea628653 --- /dev/null +++ b/bin/src/server/gateway/local_rpc_handler.rs @@ -0,0 +1,263 @@ +use std::{ + net::{IpAddr, SocketAddr}, + sync::Arc, +}; + +use atm0s_sdn::NodeId; +use media_server_gateway::ServiceKind; +use media_server_protocol::{ + endpoint::ClusterConnId, + gateway::GATEWAY_RPC_PORT, + protobuf::{ + cluster_gateway::MediaEdgeServiceClient, + gateway::{ConnectRequest, ConnectResponse, RemoteIceRequest, RemoteIceResponse}, + }, + rpc::{ + node_vnet_addr, + quinn::{QuinnClient, QuinnStream}, + }, + transport::{ + webrtc, + whep::{self, WhepConnectReq, WhepConnectRes, WhepDeleteReq, WhepDeleteRes, WhepRemoteIceReq, WhepRemoteIceRes}, + whip::{self, WhipConnectReq, WhipConnectRes, WhipDeleteReq, WhipDeleteRes, WhipRemoteIceReq, WhipRemoteIceRes}, + RpcError, RpcReq, RpcRes, RpcResult, + }, +}; + +use crate::errors::MediaServerError; + +use super::{dest_selector::GatewayDestSelector, ip_location::Ip2Location}; + +pub struct MediaLocalRpcHandler { + selector: GatewayDestSelector, + client: MediaEdgeServiceClient, + ip2location: Arc, +} + +impl MediaLocalRpcHandler { + pub fn new(selector: GatewayDestSelector, client: MediaEdgeServiceClient, ip2location: Arc) -> Self { + Self { selector, client, ip2location } + } + + pub async fn process_req(&self, conn_part: Option<(NodeId, u64)>, param: RpcReq) -> RpcRes { + match param { + RpcReq::Whip(param) => match param { + whip::RpcReq::Connect(param) => RpcRes::Whip(whip::RpcRes::Connect(self.whip_connect(param).await)), + whip::RpcReq::RemoteIce(param) => RpcRes::Whip(whip::RpcRes::RemoteIce(self.whip_remote_ice(conn_part, param).await)), + whip::RpcReq::Delete(param) => RpcRes::Whip(whip::RpcRes::Delete(self.whip_delete(conn_part, param).await)), + }, + RpcReq::Whep(param) => match param { + whep::RpcReq::Connect(param) => RpcRes::Whep(whep::RpcRes::Connect(self.whep_connect(param).await)), + whep::RpcReq::RemoteIce(param) => RpcRes::Whep(whep::RpcRes::RemoteIce(self.whep_remote_ice(conn_part, param).await)), + whep::RpcReq::Delete(param) => RpcRes::Whep(whep::RpcRes::Delete(self.whep_delete(conn_part, param).await)), + }, + RpcReq::Webrtc(param) => match param { + webrtc::RpcReq::Connect(ip, user_agent, param) => RpcRes::Webrtc(webrtc::RpcRes::Connect(self.webrtc_connect(ip, user_agent, param).await)), + webrtc::RpcReq::RemoteIce(conn, param) => RpcRes::Webrtc(webrtc::RpcRes::RemoteIce(self.webrtc_remote_ice(conn_part, conn, param).await)), + webrtc::RpcReq::RestartIce(conn, ip, user_agent, req) => RpcRes::Webrtc(webrtc::RpcRes::RestartIce(self.webrtc_restart_ice(conn_part, conn, ip, user_agent, req).await)), + webrtc::RpcReq::Delete(_) => { + //TODO implement delete webrtc conn + RpcRes::Webrtc(webrtc::RpcRes::RestartIce(Err(RpcError::new2(MediaServerError::NotImplemented)))) + } + }, + } + } + + /* + Whip part + */ + + async fn whip_connect(&self, param: WhipConnectReq) -> RpcResult> { + if let Some(selected) = self.selector.select(ServiceKind::Webrtc, self.ip2location.get_location(¶m.ip)).await { + let sock_addr = node_vnet_addr(selected, GATEWAY_RPC_PORT); + log::info!("[Gateway] selected node {selected}"); + let rpc_req = param.into(); + let res = self.client.whip_connect(sock_addr, rpc_req).await; + log::info!("[Gateway] response from node {selected} => {:?}", res); + if let Some(res) = res { + Ok(whip::WhipConnectRes { + sdp: res.sdp, + conn_id: res.conn.parse().unwrap(), + }) + } else { + Err(RpcError::new2(MediaServerError::GatewayRpcError)) + } + } else { + Err(RpcError::new2(MediaServerError::NodePoolEmpty)) + } + } + + async fn whip_remote_ice(&self, conn_part: Option<(NodeId, u64)>, param: WhipRemoteIceReq) -> RpcResult { + if let Some((node, _session)) = conn_part { + let rpc_req = media_server_protocol::protobuf::cluster_gateway::WhipRemoteIceRequest { + conn: param.conn_id.to_string(), + ice: param.ice, + }; + log::info!("[Gateway] selected node {node}"); + let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); + let res = self.client.whip_remote_ice(sock_addr, rpc_req).await; + if let Some(_res) = res { + Ok(whip::WhipRemoteIceRes {}) + } else { + Err(RpcError::new2(MediaServerError::GatewayRpcError)) + } + } else { + Err(RpcError::new2(MediaServerError::InvalidConnId)) + } + } + + async fn whip_delete(&self, conn_part: Option<(NodeId, u64)>, param: WhipDeleteReq) -> RpcResult { + if let Some((node, _session)) = conn_part { + let rpc_req = media_server_protocol::protobuf::cluster_gateway::WhipCloseRequest { conn: param.conn_id.to_string() }; + log::info!("[Gateway] selected node {node}"); + let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); + let res = self.client.whip_close(sock_addr, rpc_req).await; + if let Some(_res) = res { + Ok(whip::WhipDeleteRes {}) + } else { + Err(RpcError::new2(MediaServerError::GatewayRpcError)) + } + } else { + Err(RpcError::new2(MediaServerError::InvalidConnId)) + } + } + + /* + Whep part + */ + + async fn whep_connect(&self, param: WhepConnectReq) -> RpcResult> { + if let Some(selected) = self.selector.select(ServiceKind::Webrtc, self.ip2location.get_location(¶m.ip)).await { + let sock_addr = node_vnet_addr(selected, GATEWAY_RPC_PORT); + log::info!("[Gateway] selected node {selected}"); + let rpc_req = param.into(); + let res = self.client.whep_connect(sock_addr, rpc_req).await; + log::info!("[Gateway] response from node {selected} => {:?}", res); + if let Some(res) = res { + Ok(whep::WhepConnectRes { + sdp: res.sdp, + conn_id: res.conn.parse().unwrap(), + }) + } else { + Err(RpcError::new2(MediaServerError::GatewayRpcError)) + } + } else { + Err(RpcError::new2(MediaServerError::NodePoolEmpty)) + } + } + + async fn whep_remote_ice(&self, conn_part: Option<(NodeId, u64)>, param: WhepRemoteIceReq) -> RpcResult { + if let Some((node, _session)) = conn_part { + let rpc_req = media_server_protocol::protobuf::cluster_gateway::WhepRemoteIceRequest { + conn: param.conn_id.to_string(), + ice: param.ice, + }; + log::info!("[Gateway] selected node {node}"); + let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); + let res = self.client.whep_remote_ice(sock_addr, rpc_req).await; + if let Some(_res) = res { + Ok(whep::WhepRemoteIceRes {}) + } else { + Err(RpcError::new2(MediaServerError::GatewayRpcError)) + } + } else { + Err(RpcError::new2(MediaServerError::InvalidConnId)) + } + } + + async fn whep_delete(&self, conn_part: Option<(NodeId, u64)>, param: WhepDeleteReq) -> RpcResult { + if let Some((node, _session)) = conn_part { + let rpc_req = media_server_protocol::protobuf::cluster_gateway::WhepCloseRequest { conn: param.conn_id.to_string() }; + log::info!("[Gateway] selected node {node}"); + let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); + let res = self.client.whep_close(sock_addr, rpc_req).await; + if let Some(_res) = res { + Ok(whep::WhepDeleteRes {}) + } else { + Err(RpcError::new2(MediaServerError::GatewayRpcError)) + } + } else { + Err(RpcError::new2(MediaServerError::InvalidConnId)) + } + } + + /* + Webrtc part + */ + + async fn webrtc_connect(&self, ip: IpAddr, user_agent: String, req: ConnectRequest) -> RpcResult<(ClusterConnId, ConnectResponse)> { + if let Some(selected) = self.selector.select(ServiceKind::Webrtc, self.ip2location.get_location(&ip)).await { + let sock_addr = node_vnet_addr(selected, GATEWAY_RPC_PORT); + log::info!("[Gateway] selected node {selected}"); + let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcConnectRequest { + user_agent, + ip: ip.to_string(), + req: Some(req), + }; + let res = self.client.webrtc_connect(sock_addr, rpc_req).await; + log::info!("[Gateway] response from node {selected} => {:?}", res); + if let Some(res) = res { + if let Some(res) = res.res { + if let Ok(conn) = res.conn_id.parse() { + Ok((conn, res)) + } else { + Err(RpcError::new2(MediaServerError::MediaResError)) + } + } else { + Err(RpcError::new2(MediaServerError::GatewayRpcError)) + } + } else { + Err(RpcError::new2(MediaServerError::GatewayRpcError)) + } + } else { + Err(RpcError::new2(MediaServerError::NodePoolEmpty)) + } + } + + async fn webrtc_remote_ice(&self, conn_part: Option<(NodeId, u64)>, conn: ClusterConnId, param: RemoteIceRequest) -> RpcResult { + if let Some((node, _session)) = conn_part { + let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRemoteIceRequest { + conn: conn.to_string(), + candidates: param.candidates, + }; + log::info!("[Gateway] selected node {node}"); + let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); + let res = self.client.webrtc_remote_ice(sock_addr, rpc_req).await; + if let Some(res) = res { + Ok(RemoteIceResponse { added: res.added }) + } else { + Err(RpcError::new2(MediaServerError::GatewayRpcError)) + } + } else { + Err(RpcError::new2(MediaServerError::InvalidConnId)) + } + } + + async fn webrtc_restart_ice(&self, conn_part: Option<(NodeId, u64)>, conn: ClusterConnId, ip: IpAddr, user_agent: String, req: ConnectRequest) -> RpcResult<(ClusterConnId, ConnectResponse)> { + //TODO how to handle media-node down? + if let Some((node, _session)) = conn_part { + let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRestartIceRequest { + conn: conn.to_string(), + ip: ip.to_string(), + user_agent, + req: Some(req), + }; + log::info!("[Gateway] selected node {node}"); + let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT); + let res = self.client.webrtc_restart_ice(sock_addr, rpc_req).await; + if let Some(res) = res { + if let Some(res) = res.res { + Ok((res.conn_id.parse().unwrap(), res)) + } else { + Err(RpcError::new2(MediaServerError::MediaResError)) + } + } else { + Err(RpcError::new2(MediaServerError::GatewayRpcError)) + } + } else { + Err(RpcError::new2(MediaServerError::InvalidConnId)) + } + } +} + +//TODO test diff --git a/bin/src/server/gateway/rpc_handler.rs b/bin/src/server/gateway/remote_rpc_handler.rs similarity index 97% rename from bin/src/server/gateway/rpc_handler.rs rename to bin/src/server/gateway/remote_rpc_handler.rs index d34fb627..390528ef 100644 --- a/bin/src/server/gateway/rpc_handler.rs +++ b/bin/src/server/gateway/remote_rpc_handler.rs @@ -26,9 +26,9 @@ pub struct Ctx { } #[derive(Default)] -pub struct MediaRpcHandlerImpl {} +pub struct MediaRemoteRpcHandlerImpl {} -impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { +impl MediaEdgeServiceHandler for MediaRemoteRpcHandlerImpl { async fn whip_connect(&self, ctx: &Ctx, req: WhipConnectRequest) -> Option { log::info!("On whip_connect from other gateway"); let location = req.ip.parse().ok().and_then(|ip| ctx.ip2location.get_location(&ip)); @@ -101,3 +101,5 @@ impl MediaEdgeServiceHandler for MediaRpcHandlerImpl { ctx.client.webrtc_restart_ice(dest_addr, req).await } } + +//TODO test diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index 1a13a424..8bb35e6e 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -7,6 +7,7 @@ use std::{ use atm0s_sdn::{features::FeaturesEvent, SdnExtIn, SdnExtOut}; use clap::Parser; +use media_server_gateway::{ServiceKind, AGENT_SERVICE_ID}; use media_server_protocol::{gateway::GATEWAY_RPC_PORT, protobuf::cluster_gateway::MediaEdgeServiceServer, rpc::quinn::QuinnServer}; use media_server_runner::MediaConfig; use media_server_secure::jwt::{MediaEdgeSecureJwt, MediaGatewaySecureJwt}; @@ -16,6 +17,7 @@ use sans_io_runtime::{backend::PollingBackend, Controller}; use crate::{ http::run_media_http_server, + node_metrics::NodeMetricsCollector, quinn::{make_quinn_server, VirtualNetwork}, server::media::runtime_worker::MediaRuntimeWorker, NodeConfig, @@ -47,6 +49,10 @@ pub struct Args { /// Custom binding address for WebRTC UDP #[arg(env, long)] custom_ips: Vec, + + /// Max ccu per core + #[arg(env, long, default_value_t = 200)] + ccu_per_core: u32, } pub async fn run_media_server(workers: usize, http_port: Option, node: NodeConfig, args: Args) { @@ -94,6 +100,7 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node webrtc_addrs: webrtc_addrs.clone(), ice_lite: args.ice_lite, secure: secure.clone(), + max_live: HashMap::from([(ServiceKind::Webrtc, workers as u32 * args.ccu_per_core)]), }, }; controller.add_worker::<_, _, MediaRuntimeWorker<_>, PollingBackend<_, 128, 512>>(Duration::from_millis(1), cfg, None); @@ -125,10 +132,26 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node tokio::task::spawn_local(async move { while vnet.recv().await.is_some() {} }); + // Collect node metrics for update to gateway agent service, this information is used inside gateway + // for routing to best media-node + let mut node_metrics_collector = NodeMetricsCollector::default(); + loop { if controller.process().is_none() { break; } + + // Pop from metric collector and pass to Gateway agent service + if let Some(metrics) = node_metrics_collector.pop_measure() { + controller.send_to( + 0, //because sdn controller allway is run inside worker 0 + ExtIn::Sdn(SdnExtIn::ServicesControl( + AGENT_SERVICE_ID.into(), + 0.into(), + media_server_gateway::agent_service::Control::NodeStats(metrics).into(), + )), + ); + } while let Ok(control) = vnet_rx.try_recv() { controller.send_to_best(ExtIn::Sdn(SdnExtIn::FeaturesControl(0.into(), control.into()))); } diff --git a/bin/src/server/media/runtime_worker.rs b/bin/src/server/media/runtime_worker.rs index 242431aa..aa668363 100644 --- a/bin/src/server/media/runtime_worker.rs +++ b/bin/src/server/media/runtime_worker.rs @@ -54,6 +54,7 @@ impl WorkerInner, +} + +impl Into for &ServiceWorkersStats { + fn into(self) -> ServiceStats { + ServiceStats { + live: self.workers.values().sum(), + max: self.max, + active: true, //TODO how to update this? maybe with gradeful-shutdown + } + } +} #[derive(Debug, Clone)] pub enum Control { - Stats(Vec<(ServiceKind, u32)>), + NodeStats(NodeMetrics), + WorkerUsage(ServiceKind, u16, u32), } #[derive(Debug, Clone)] @@ -27,14 +47,18 @@ pub enum Event {} pub struct GatewayAgentService { output: Option>, seq: u16, + node: NodeMetrics, + services: HashMap, _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>, } -impl Default for GatewayAgentService { - fn default() -> Self { +impl GatewayAgentService { + pub fn new(max: HashMap) -> Self { Self { output: None, seq: 0, + node: Default::default(), + services: HashMap::from_iter(max.into_iter().map(|(k, v)| (k, ServiceWorkersStats { max: v, workers: HashMap::new() }))), _tmp: std::marker::PhantomData, } } @@ -42,7 +66,7 @@ impl Default for GatewayAgentService Service for GatewayAgentService where - SC: From + TryInto, + SC: From + TryInto + Debug, SE: From + TryInto, { fn service_id(&self) -> u8 { @@ -62,22 +86,39 @@ where meta.source = true; let data = protobuf::cluster_gateway::GatewayEvent { event: Some(gateway_event::Event::Ping(protobuf::cluster_gateway::PingEvent { - cpu: 0, //TODO - memory: 0, //TODO - disk: 0, //TODO - webrtc: Some(ServiceStats { active: true, live: 0, max: 100 }), + cpu: self.node.cpu as u32, + memory: self.node.memory as u32, + disk: self.node.disk as u32, + webrtc: self.services.get(&ServiceKind::Webrtc).map(|s| s.into()), origin: Some(Origin::Media(MediaOrigin {})), })), } .encode_to_vec(); - log::info!("[GatewayAgent] broadcast ping to zone gateways"); + log::debug!("[GatewayAgent] broadcast ping to zone gateways"); self.output = Some(ServiceOutput::FeatureControl(data::Control::DataSendRule(DATA_PORT, rule, meta, data).into())); } ServiceSharedInput::Connection(_) => {} } } - fn on_input(&mut self, _ctx: &ServiceCtx, _now: u64, _input: ServiceInput) {} + fn on_input(&mut self, _ctx: &ServiceCtx, _now: u64, input: ServiceInput) { + match input { + ServiceInput::Control(_, control) => match return_if_err!(control.try_into()) { + Control::NodeStats(metrics) => { + log::debug!("[GatewayAgentService] node metrics {:?}", metrics); + self.node = metrics; + } + Control::WorkerUsage(kind, worker, live) => { + log::debug!("[GatewayAgentService] worker {worker} live {live}"); + if let Some(service) = self.services.get_mut(&kind) { + service.workers.insert(worker, live); + } + } + }, + ServiceInput::FromWorker(_) => {} + ServiceInput::FeatureEvent(_) => {} + } + } fn pop_output2(&mut self, _now: u64) -> Option> { self.output.take() @@ -113,12 +154,13 @@ impl ServiceWorker { + max: HashMap, _tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>, } -impl Default for GatewayAgentServiceBuilder { - fn default() -> Self { - Self { _tmp: std::marker::PhantomData } +impl GatewayAgentServiceBuilder { + pub fn new(max: HashMap) -> Self { + Self { max, _tmp: std::marker::PhantomData } } } @@ -143,7 +185,7 @@ where } fn create(&self) -> Box> { - Box::>::default() + Box::new(GatewayAgentService::new(self.max.clone())) } fn create_worker(&self) -> Box> { diff --git a/packages/media_gateway/src/lib.rs b/packages/media_gateway/src/lib.rs index efc08678..e6656fe5 100644 --- a/packages/media_gateway/src/lib.rs +++ b/packages/media_gateway/src/lib.rs @@ -2,11 +2,18 @@ pub mod agent_service; mod store; pub mod store_service; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash, PartialEq, Eq)] pub enum ServiceKind { Webrtc, } +#[derive(Debug, Clone, Default)] +pub struct NodeMetrics { + pub cpu: u8, + pub memory: u8, + pub disk: u8, +} + pub const DATA_PORT: u16 = 10001; pub const STORE_SERVICE_ID: u8 = 101; diff --git a/packages/media_gateway/src/store.rs b/packages/media_gateway/src/store.rs index 032fed86..df91afc2 100644 --- a/packages/media_gateway/src/store.rs +++ b/packages/media_gateway/src/store.rs @@ -1,14 +1,11 @@ use media_server_protocol::protobuf::cluster_gateway::ping_event::{gateway_origin::Location, GatewayOrigin, Origin, ServiceStats}; -use crate::ServiceKind; +use crate::{NodeMetrics, ServiceKind}; use self::service::ServiceStore; mod service; -const MAX_MEMORY_USAGE: u8 = 80; -const MAX_DISK_USAGE: u8 = 90; - #[derive(Debug, PartialEq)] pub struct PingEvent { pub cpu: u8, @@ -20,28 +17,40 @@ pub struct PingEvent { pub struct GatewayStore { zone: u32, + node: NodeMetrics, location: Location, webrtc: ServiceStore, output: Option, + max_cpu: u8, + max_memory: u8, + max_disk: u8, } impl GatewayStore { - pub fn new(zone: u32, location: Location) -> Self { + pub fn new(zone: u32, location: Location, max_cpu: u8, max_memory: u8, max_disk: u8) -> Self { Self { + node: NodeMetrics::default(), webrtc: ServiceStore::new(ServiceKind::Webrtc, location.clone()), zone, location, output: None, + max_cpu, + max_disk, + max_memory, } } + pub fn on_node_metrics(&mut self, _now: u64, metrics: NodeMetrics) { + self.node = metrics; + } + pub fn on_tick(&mut self, now: u64) { self.webrtc.on_tick(now); let ping = PingEvent { - cpu: 0, //TODO - memory: 0, //TODO - disk: 0, //TODO + cpu: self.node.cpu, + memory: self.node.memory, + disk: self.node.disk, origin: Origin::Gateway(GatewayOrigin { zone: self.zone, location: Some(self.location.clone()), @@ -55,8 +64,8 @@ impl GatewayStore { pub fn on_ping(&mut self, now: u64, from: u32, ping: PingEvent) { log::debug!("[GatewayStore] on ping from {from} data {:?}", ping); - let node_usage = node_usage(&ping); - let webrtc_usage = webrtc_usage(&ping); + let node_usage = node_usage(&ping, self.max_cpu, self.max_memory, self.max_disk); + let webrtc_usage = webrtc_usage(&ping, self.max_cpu, self.max_memory, self.max_disk); match ping.origin { Origin::Media(_) => match (node_usage, webrtc_usage, ping.webrtc) { (Some(_node), Some(webrtc), Some(stats)) => self.webrtc.on_node_ping(now, from, webrtc, stats), @@ -88,24 +97,32 @@ impl GatewayStore { } } -fn node_usage(ping: &PingEvent) -> Option { - if ping.memory >= MAX_MEMORY_USAGE { +fn node_usage(ping: &PingEvent, max_cpu: u8, max_memory: u8, max_disk: u8) -> Option { + if ping.memory >= max_cpu { + return None; + } + + if ping.memory >= max_memory { return None; } - if ping.disk >= MAX_DISK_USAGE { + if ping.disk >= max_disk { return None; } Some(ping.cpu) } -fn webrtc_usage(ping: &PingEvent) -> Option { - if ping.memory >= MAX_MEMORY_USAGE { +fn webrtc_usage(ping: &PingEvent, max_cpu: u8, max_memory: u8, max_disk: u8) -> Option { + if ping.memory >= max_cpu { return None; } - if ping.disk >= MAX_DISK_USAGE { + if ping.memory >= max_memory { + return None; + } + + if ping.disk >= max_disk { return None; } @@ -123,7 +140,7 @@ mod tests { #[test] fn local_ping() { - let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 }); + let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 }, 60, 80, 90); store.on_ping( 0, 1, @@ -157,7 +174,7 @@ mod tests { #[test] fn local_reject_max_usage() { - let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 }); + let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 }, 60, 80, 90); store.on_ping( 0, 1, @@ -182,12 +199,24 @@ mod tests { }, ); + store.on_ping( + 0, + 3, + PingEvent { + cpu: 60, + memory: 80, + disk: 20, + origin: Origin::Media(MediaOrigin {}), + webrtc: Some(ServiceStats { live: 100, max: 1000, active: true }), + }, + ); + assert_eq!(store.best_for(ServiceKind::Webrtc, None), None); } #[test] fn remote_ping() { - let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 }); + let mut store = GatewayStore::new(0, Location { lat: 1.0, lon: 1.0 }, 60, 80, 90); store.on_ping( 0, 257, diff --git a/packages/media_gateway/src/store_service.rs b/packages/media_gateway/src/store_service.rs index d1ad9176..fbbf4aa5 100644 --- a/packages/media_gateway/src/store_service.rs +++ b/packages/media_gateway/src/store_service.rs @@ -16,11 +16,12 @@ use prost::Message as _; use crate::{ store::{GatewayStore, PingEvent}, - ServiceKind, DATA_PORT, STORE_SERVICE_ID, STORE_SERVICE_NAME, + NodeMetrics, ServiceKind, DATA_PORT, STORE_SERVICE_ID, STORE_SERVICE_NAME, }; #[derive(Debug, Clone)] pub enum Control { + NodeStats(NodeMetrics), FindNodeReq(u64, ServiceKind, Option), } @@ -41,9 +42,9 @@ where SC: From + TryInto, SE: From + TryInto, { - pub fn new(zone: u32, lat: f32, lon: f32) -> Self { + pub fn new(zone: u32, lat: f32, lon: f32, max_cpu: u8, max_memory: u8, max_disk: u8) -> Self { Self { - store: GatewayStore::new(zone, Location { lat, lon }), + store: GatewayStore::new(zone, Location { lat, lon }, max_cpu, max_memory, max_disk), queue: VecDeque::from([ServiceOutput::FeatureControl(data::Control::DataListen(DATA_PORT).into())]), seq: 0, _tmp: std::marker::PhantomData, @@ -127,6 +128,10 @@ where let out = self.store.best_for(kind, location); self.queue.push_back(ServiceOutput::Event(actor, Event::FindNodeRes(req_id, out).into())); } + Control::NodeStats(metrics) => { + log::debug!("[GatewayStoreService] node metrics {:?}", metrics); + self.store.on_node_metrics(now, metrics); + } } } } @@ -175,15 +180,21 @@ pub struct GatewayStoreServiceBuilder { zone: u32, lat: f32, lon: f32, + max_memory: u8, + max_disk: u8, + max_cpu: u8, } impl GatewayStoreServiceBuilder { - pub fn new(zone: u32, lat: f32, lon: f32) -> Self { + pub fn new(zone: u32, lat: f32, lon: f32, max_cpu: u8, max_memory: u8, max_disk: u8) -> Self { Self { zone, lat, lon, _tmp: std::marker::PhantomData, + max_cpu, + max_memory, + max_disk, } } } @@ -209,7 +220,7 @@ where } fn create(&self) -> Box> { - Box::new(GatewayStoreService::new(self.zone, self.lat, self.lon)) + Box::new(GatewayStoreService::new(self.zone, self.lat, self.lon, self.max_cpu, self.max_memory, self.max_disk)) } fn create_worker(&self) -> Box> { diff --git a/packages/media_runner/src/worker.rs b/packages/media_runner/src/worker.rs index 6c1c5487..b52b4503 100644 --- a/packages/media_runner/src/worker.rs +++ b/packages/media_runner/src/worker.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, sync::Arc, time::Instant}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Instant}; use atm0s_sdn::{ generate_node_addr, @@ -7,7 +7,7 @@ use atm0s_sdn::{ ControllerPlaneCfg, DataPlaneCfg, DataWorkerHistory, NetInput, NetOutput, SdnExtIn, SdnExtOut, SdnWorker, SdnWorkerBusEvent, SdnWorkerCfg, SdnWorkerInput, SdnWorkerOutput, TimePivot, }; use media_server_core::cluster::{self, MediaCluster}; -use media_server_gateway::agent_service::GatewayAgentServiceBuilder; +use media_server_gateway::{agent_service::GatewayAgentServiceBuilder, ServiceKind, AGENT_SERVICE_ID}; use media_server_protocol::{ gateway::generate_gateway_zone_tag, protobuf::gateway::{ConnectResponse, RemoteIceResponse}, @@ -27,10 +27,13 @@ use sans_io_runtime::{ }; use transport_webrtc::{GroupInput, MediaWorkerWebrtc, VariantParams, WebrtcOwner}; +const FEEDBACK_GATEWAY_AGENT_INTERVAL: u64 = 1000; //only feedback every second + pub struct MediaConfig { pub ice_lite: bool, pub webrtc_addrs: Vec, pub secure: Arc, + pub max_live: HashMap, } pub type SdnConfig = SdnWorkerCfg; @@ -87,6 +90,7 @@ enum MediaClusterOwner { #[allow(clippy::type_complexity)] pub struct MediaServerWorker { + worker: u16, sdn_slot: usize, sdn_worker: TaskSwitcherBranch, SdnWorkerOutput>, media_cluster: TaskSwitcherBranch, cluster::Output>, @@ -94,12 +98,13 @@ pub struct MediaServerWorker { switcher: TaskSwitcher, queue: DynamicDeque, timer: TimePivot, + last_feedback_gateway_agent: u64, secure: Arc, } impl MediaServerWorker { #[allow(clippy::too_many_arguments)] - pub fn new(node_id: u32, session: u64, secret: &str, controller: bool, sdn_udp: u16, sdn_custom_addrs: Vec, sdn_zone: u32, media: MediaConfig) -> Self { + pub fn new(worker: u16, node_id: u32, session: u64, secret: &str, controller: bool, sdn_udp: u16, sdn_custom_addrs: Vec, sdn_zone: u32, media: MediaConfig) -> Self { let secure = media.secure.clone(); //TODO why need this? let sdn_udp_addr = SocketAddr::from(([0, 0, 0, 0], sdn_udp)); @@ -107,7 +112,7 @@ impl MediaServerWorker { let visualization = Arc::new(visualization::VisualizationServiceBuilder::new(false)); let discovery = Arc::new(manual_discovery::ManualDiscoveryServiceBuilder::new(node_addr, vec![], vec![generate_gateway_zone_tag(sdn_zone)])); - let gateway = Arc::new(GatewayAgentServiceBuilder::default()); + let gateway = Arc::new(GatewayAgentServiceBuilder::new(media.max_live)); let sdn_config = SdnConfig { node_id, @@ -131,6 +136,7 @@ impl MediaServerWorker { }; Self { + worker, sdn_slot: 1, //TODO dont use this hack, must to wait to bind success to network sdn_worker: TaskSwitcherBranch::new(SdnWorker::new(sdn_config), TaskType::Sdn), media_cluster: TaskSwitcherBranch::default(TaskType::MediaCluster), @@ -138,6 +144,7 @@ impl MediaServerWorker { switcher: TaskSwitcher::new(3), queue: DynamicDeque::from([Output::Net(Owner::Sdn, BackendOutgoing::UdpListen { addr: sdn_udp_addr, reuse: true })]), timer: TimePivot::build(), + last_feedback_gateway_agent: 0, secure, } } @@ -152,7 +159,20 @@ impl MediaServerWorker { self.sdn_worker.input(s).on_tick(now_ms); self.media_cluster.input(s).on_tick(now); self.media_webrtc.input(s).on_tick(now); - //TODO collect node stats then send to GatewayAgent service + + if self.last_feedback_gateway_agent + FEEDBACK_GATEWAY_AGENT_INTERVAL <= now_ms { + self.last_feedback_gateway_agent = now_ms; + + let webrtc_live = self.media_webrtc.tasks() as u32; + self.sdn_worker.input(s).on_event( + now_ms, + SdnWorkerInput::Ext(SdnExtIn::ServicesControl( + AGENT_SERVICE_ID.into(), + 0.into(), + media_server_gateway::agent_service::Control::WorkerUsage(ServiceKind::Webrtc, self.worker, webrtc_live).into(), + )), + ); + } } pub fn on_event(&mut self, now: Instant, input: Input) {