Skip to content

Commit

Permalink
fixed: route restart-ice to other media node if current down
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Aug 7, 2024
1 parent faf69e2 commit 0fdb58c
Show file tree
Hide file tree
Showing 15 changed files with 166 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/media_z0_n2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ cargo run -- \
--seeds 1@/ip4/127.0.0.1/udp/10001 \
--workers 2 \
media \
--webrtc-port-seed 10200 \
--enable-token-api
1 change: 1 addition & 0 deletions bin/media_z0_n3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ cargo run -- \
--seeds 1@/ip4/127.0.0.1/udp/10001 \
--workers 2 \
media \
--webrtc-port-seed 10300 \
--enable-token-api
1 change: 1 addition & 0 deletions bin/media_z256_n1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ cargo run -- \
--seeds 256@/ip4/127.0.0.1/udp/11000 \
--workers 2 \
media \
--webrtc-port-seed 11200 \
--enable-token-api
1 change: 1 addition & 0 deletions bin/media_z256_n2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ cargo run -- \
--seeds 256@/ip4/127.0.0.1/udp/11000 \
--workers 2 \
media \
--webrtc-port-seed 11300 \
--enable-token-api
1 change: 1 addition & 0 deletions bin/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pub enum MediaServerError {
NodePoolEmpty = 0x00020003,
MediaResError = 0x00020004,
NotImplemented = 0x00020005,
NodeTimeout = 0x00020006,
}
1 change: 1 addition & 0 deletions bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
max_sessions = max;
}
media_server_gateway::store_service::Event::FindNodeRes(req_id, res) => requester.on_find_node_res(req_id, res),
media_server_gateway::store_service::Event::FindDestRes(req_id, res) => requester.on_find_dest_res(req_id, res),
},
SdnExtOut::ServicesEvent(_, _, SE::Connector(event)) => match event {
media_server_connector::agent_service::Event::Stats { queue: _, inflight: _, acked: _ } => {}
Expand Down
55 changes: 43 additions & 12 deletions bin/src/server/gateway/dest_selector.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,37 @@
use std::collections::HashMap;

use atm0s_sdn::NodeId;
use media_server_gateway::ServiceKind;
use media_server_protocol::protobuf::cluster_gateway::ping_event::gateway_origin::Location;
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
oneshot,
};

type QueryRequest = (ServiceKind, Option<(f32, f32)>, oneshot::Sender<Option<u32>>);
enum QueryRequest {
Select(ServiceKind, Option<(f32, f32)>, oneshot::Sender<Option<NodeId>>),
DestFor(ServiceKind, NodeId, oneshot::Sender<Option<NodeId>>),
}

#[derive(Clone)]
pub struct GatewayDestSelector {
tx: Sender<QueryRequest>,
}

impl GatewayDestSelector {
pub async fn select(&self, kind: ServiceKind, location: Option<(f32, f32)>) -> Option<u32> {
/// Select best destination, it can be media-node or other gateway node
pub async fn select(&self, kind: ServiceKind, location: Option<(f32, f32)>) -> Option<NodeId> {
let (tx, rx) = oneshot::channel();
self.tx.send(QueryRequest::Select(kind, location, tx)).await.ok()?;
rx.await.ok()?
}

/// Find forward dest if we need to send request to a node.
/// if node is in current zone, then return Some(node) if it avaiable
/// if node in other zone, return the zone gateway node
pub async fn dest_for(&self, kind: ServiceKind, node: NodeId) -> Option<NodeId> {
let (tx, rx) = oneshot::channel();
self.tx.send((kind, location, tx)).await.ok()?;
self.tx.send(QueryRequest::DestFor(kind, node, tx)).await.ok()?;
rx.await.ok()?
}
}
Expand All @@ -37,16 +51,33 @@ impl GatewayDestRequester {
}
}

pub fn on_find_dest_res(&mut self, req_id: u64, res: Option<u32>) {
if let Some(tx) = self.reqs.remove(&req_id) {
if tx.send(res).is_err() {
log::error!("[GatewayDestRequester] answer for req_id {req_id} error");
}
}
}

pub fn recv(&mut self) -> Option<media_server_gateway::store_service::Control> {
let (kind, location, tx) = self.rx.try_recv().ok()?;
let req_id = self.req_seed;
self.req_seed += 1;
self.reqs.insert(req_id, tx);
Some(media_server_gateway::store_service::Control::FindNodeReq(
req_id,
kind,
location.map(|(lat, lon)| Location { lat, lon }),
))
match self.rx.try_recv().ok()? {
QueryRequest::Select(kind, location, tx) => {
let req_id = self.req_seed;
self.req_seed += 1;
self.reqs.insert(req_id, tx);
Some(media_server_gateway::store_service::Control::FindNodeReq(
req_id,
kind,
location.map(|(lat, lon)| Location { lat, lon }),
))
}
QueryRequest::DestFor(kind, dest, tx) => {
let req_id = self.req_seed;
self.req_seed += 1;
self.reqs.insert(req_id, tx);
Some(media_server_gateway::store_service::Control::FindDestReq(req_id, kind, dest))
}
}
}
}

Expand Down
79 changes: 38 additions & 41 deletions bin/src/server/gateway/local_rpc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl MediaLocalRpcHandler {
}
} else {
self.feedback_route_error(session_id, now_ms() - started_at, Some(node_id), ErrorType::Timeout).await;
Err(RpcError::new2(MediaServerError::GatewayRpcError))
Err(RpcError::new2(MediaServerError::NodeTimeout))
}
} else {
self.feedback_route_error(session_id, now_ms() - started_at, None, ErrorType::PoolEmpty).await;
Expand All @@ -325,22 +325,16 @@ impl MediaLocalRpcHandler {
}

async fn webrtc_remote_ice(&self, conn_part: Option<(NodeId, u64)>, conn: ClusterConnId, param: RemoteIceRequest) -> RpcResult<RemoteIceResponse> {
if let Some((node, _session)) = conn_part {
let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRemoteIceRequest {
conn: conn.to_string(),
candidates: param.candidates,
};
log::info!("[Gateway] selected node {node}");
let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT);
let res = self.client.webrtc_remote_ice(sock_addr, rpc_req).await;
if let Some(res) = res {
Ok(RemoteIceResponse { added: res.added })
} else {
Err(RpcError::new2(MediaServerError::GatewayRpcError))
}
} else {
Err(RpcError::new2(MediaServerError::InvalidConnId))
}
let (node, _session) = conn_part.ok_or(RpcError::new2(MediaServerError::InvalidConnId))?;
let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRemoteIceRequest {
conn: conn.to_string(),
candidates: param.candidates,
};
log::info!("[Gateway] selected node {node}");
let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT);
let res = self.client.webrtc_remote_ice(sock_addr, rpc_req).await;
let res = res.ok_or(RpcError::new2(MediaServerError::GatewayRpcError))?;
Ok(RemoteIceResponse { added: res.added })
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -354,31 +348,34 @@ impl MediaLocalRpcHandler {
extra_data: Option<String>,
record: bool,
) -> RpcResult<(ClusterConnId, ConnectResponse)> {
//TODO how to handle media-node down?
if let Some((node, _session)) = conn_part {
let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRestartIceRequest {
conn: conn.to_string(),
ip: ip.to_string(),
user_agent,
req: Some(req),
record,
extra_data,
};
log::info!("[Gateway] selected node {node}");
let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT);
let res = self.client.webrtc_restart_ice(sock_addr, rpc_req).await;
if let Some(res) = res {
if let Some(res) = res.res {
Ok((res.conn_id.parse().unwrap(), res))
} else {
Err(RpcError::new2(MediaServerError::MediaResError))
let (node, _session) = conn_part.ok_or(RpcError::new2(MediaServerError::InvalidConnId))?;
let dest = match self.selector.dest_for(ServiceKind::Webrtc, node).await {
Some(dest) => dest,
None => match self.selector.select(ServiceKind::Webrtc, self.ip2location.get_location(&ip)).await {
Some(dest) => {
log::warn!("[Gateway] not found dest {node} found other node {dest} for restart-ice (reconnect to other server)");
dest
}
} else {
Err(RpcError::new2(MediaServerError::GatewayRpcError))
}
} else {
Err(RpcError::new2(MediaServerError::InvalidConnId))
}
None => {
log::warn!("[Gateway] node pool empty for restart-ice to dest {node}");
return RpcResult::Err(RpcError::new2(MediaServerError::NodePoolEmpty));
}
},
};
log::info!("[Gateway] selected dest node {dest} with provided node {node}");
let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRestartIceRequest {
conn: conn.to_string(),
ip: ip.to_string(),
user_agent,
req: Some(req),
record,
extra_data,
};
let sock_addr = node_vnet_addr(dest, GATEWAY_RPC_PORT);
let res = self.client.webrtc_restart_ice(sock_addr, rpc_req).await;
let res = res.ok_or(RpcError::new2(MediaServerError::GatewayRpcError))?;
let res = res.res.ok_or(RpcError::new2(MediaServerError::MediaResError))?;
Ok((res.conn_id.parse().unwrap(), res))
}

/*
Expand Down
1 change: 1 addition & 0 deletions packages/media_gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ edition = "2021"
log = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
media-server-protocol = { path = "../protocol" }
media-server-utils = { path = "../media_utils" }
atm0s-sdn = { workspace = true }
prost = { workspace = true }
16 changes: 13 additions & 3 deletions packages/media_gateway/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use atm0s_sdn::NodeId;
use media_server_protocol::protobuf::cluster_gateway::ping_event::{gateway_origin::Location, GatewayOrigin, Origin, ServiceStats};

use crate::{NodeMetrics, ServiceKind};
Expand Down Expand Up @@ -32,8 +33,8 @@ impl GatewayStore {
pub fn new(zone: u32, location: Location, max_cpu: u8, max_memory: u8, max_disk: u8) -> Self {
Self {
node: NodeMetrics::default(),
webrtc: ServiceStore::new(ServiceKind::Webrtc, location),
rtpengine: ServiceStore::new(ServiceKind::RtpEngine, location),
webrtc: ServiceStore::new(zone, ServiceKind::Webrtc, location),
rtpengine: ServiceStore::new(zone, ServiceKind::RtpEngine, location),
zone,
location,
output: None,
Expand Down Expand Up @@ -101,7 +102,7 @@ impl GatewayStore {
}
}

pub fn best_for(&self, kind: ServiceKind, location: Option<Location>) -> Option<u32> {
pub fn best_for(&self, kind: ServiceKind, location: Option<Location>) -> Option<NodeId> {
let node = match kind {
ServiceKind::Webrtc => self.webrtc.best_for(location),
ServiceKind::RtpEngine => self.rtpengine.best_for(location),
Expand All @@ -110,6 +111,15 @@ impl GatewayStore {
node
}

pub fn dest_for(&self, kind: ServiceKind, dest: NodeId) -> Option<NodeId> {
let node = match kind {
ServiceKind::Webrtc => self.webrtc.dest_for(dest),
ServiceKind::RtpEngine => self.rtpengine.dest_for(dest),
};
log::debug!("[GatewayStore] query dest {:?} for node {} got {:?}", kind, dest, node);
node
}

pub fn local_stats(&self) -> Option<ServiceStats> {
self.webrtc.local_stats()
}
Expand Down
Loading

0 comments on commit 0fdb58c

Please sign in to comment.