Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed: route restart-ice to other media node if current down #410

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/media_z0_n2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ cargo run -- \
--seeds 1@/ip4/127.0.0.1/udp/10001 \
--workers 2 \
media \
--webrtc-port-seed 10200 \
--enable-token-api
1 change: 1 addition & 0 deletions bin/media_z0_n3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ cargo run -- \
--seeds 1@/ip4/127.0.0.1/udp/10001 \
--workers 2 \
media \
--webrtc-port-seed 10300 \
--enable-token-api
1 change: 1 addition & 0 deletions bin/media_z256_n1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ cargo run -- \
--seeds 256@/ip4/127.0.0.1/udp/11000 \
--workers 2 \
media \
--webrtc-port-seed 11200 \
--enable-token-api
1 change: 1 addition & 0 deletions bin/media_z256_n2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ cargo run -- \
--seeds 256@/ip4/127.0.0.1/udp/11000 \
--workers 2 \
media \
--webrtc-port-seed 11300 \
--enable-token-api
1 change: 1 addition & 0 deletions bin/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pub enum MediaServerError {
NodePoolEmpty = 0x00020003,
MediaResError = 0x00020004,
NotImplemented = 0x00020005,
NodeTimeout = 0x00020006,
}
1 change: 1 addition & 0 deletions bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@
max_sessions = max;
}
media_server_gateway::store_service::Event::FindNodeRes(req_id, res) => requester.on_find_node_res(req_id, res),
media_server_gateway::store_service::Event::FindDestRes(req_id, res) => requester.on_find_dest_res(req_id, res),

Check warning on line 244 in bin/src/server/gateway.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway.rs#L244

Added line #L244 was not covered by tests
},
SdnExtOut::ServicesEvent(_, _, SE::Connector(event)) => match event {
media_server_connector::agent_service::Event::Stats { queue: _, inflight: _, acked: _ } => {}
Expand Down
55 changes: 43 additions & 12 deletions bin/src/server/gateway/dest_selector.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,37 @@
use std::collections::HashMap;

use atm0s_sdn::NodeId;
use media_server_gateway::ServiceKind;
use media_server_protocol::protobuf::cluster_gateway::ping_event::gateway_origin::Location;
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
oneshot,
};

type QueryRequest = (ServiceKind, Option<(f32, f32)>, oneshot::Sender<Option<u32>>);
enum QueryRequest {
Select(ServiceKind, Option<(f32, f32)>, oneshot::Sender<Option<NodeId>>),
DestFor(ServiceKind, NodeId, oneshot::Sender<Option<NodeId>>),
}

#[derive(Clone)]
pub struct GatewayDestSelector {
tx: Sender<QueryRequest>,
}

impl GatewayDestSelector {
pub async fn select(&self, kind: ServiceKind, location: Option<(f32, f32)>) -> Option<u32> {
/// Select best destination, it can be media-node or other gateway node
pub async fn select(&self, kind: ServiceKind, location: Option<(f32, f32)>) -> Option<NodeId> {
let (tx, rx) = oneshot::channel();
self.tx.send(QueryRequest::Select(kind, location, tx)).await.ok()?;
rx.await.ok()?
}

Check warning on line 27 in bin/src/server/gateway/dest_selector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway/dest_selector.rs#L23-L27

Added lines #L23 - L27 were not covered by tests

/// Find forward dest if we need to send request to a node.
/// if node is in current zone, then return Some(node) if it available
/// if node in other zone, return the zone gateway node
pub async fn dest_for(&self, kind: ServiceKind, node: NodeId) -> Option<NodeId> {

Check warning on line 32 in bin/src/server/gateway/dest_selector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway/dest_selector.rs#L32

Added line #L32 was not covered by tests
let (tx, rx) = oneshot::channel();
self.tx.send((kind, location, tx)).await.ok()?;
self.tx.send(QueryRequest::DestFor(kind, node, tx)).await.ok()?;

Check warning on line 34 in bin/src/server/gateway/dest_selector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway/dest_selector.rs#L34

Added line #L34 was not covered by tests
rx.await.ok()?
}
}
Expand All @@ -37,16 +51,33 @@
}
}

pub fn on_find_dest_res(&mut self, req_id: u64, res: Option<u32>) {
if let Some(tx) = self.reqs.remove(&req_id) {
if tx.send(res).is_err() {
log::error!("[GatewayDestRequester] answer for req_id {req_id} error");
}
}
}

Check warning on line 60 in bin/src/server/gateway/dest_selector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway/dest_selector.rs#L54-L60

Added lines #L54 - L60 were not covered by tests

pub fn recv(&mut self) -> Option<media_server_gateway::store_service::Control> {
let (kind, location, tx) = self.rx.try_recv().ok()?;
let req_id = self.req_seed;
self.req_seed += 1;
self.reqs.insert(req_id, tx);
Some(media_server_gateway::store_service::Control::FindNodeReq(
req_id,
kind,
location.map(|(lat, lon)| Location { lat, lon }),
))
match self.rx.try_recv().ok()? {
QueryRequest::Select(kind, location, tx) => {
let req_id = self.req_seed;
self.req_seed += 1;
self.reqs.insert(req_id, tx);
Some(media_server_gateway::store_service::Control::FindNodeReq(
req_id,
kind,
location.map(|(lat, lon)| Location { lat, lon }),
))

Check warning on line 72 in bin/src/server/gateway/dest_selector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway/dest_selector.rs#L63-L72

Added lines #L63 - L72 were not covered by tests
}
QueryRequest::DestFor(kind, dest, tx) => {
let req_id = self.req_seed;
self.req_seed += 1;
self.reqs.insert(req_id, tx);
Some(media_server_gateway::store_service::Control::FindDestReq(req_id, kind, dest))

Check warning on line 78 in bin/src/server/gateway/dest_selector.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway/dest_selector.rs#L74-L78

Added lines #L74 - L78 were not covered by tests
}
}
}
}

Expand Down
79 changes: 38 additions & 41 deletions bin/src/server/gateway/local_rpc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@
}
} else {
self.feedback_route_error(session_id, now_ms() - started_at, Some(node_id), ErrorType::Timeout).await;
Err(RpcError::new2(MediaServerError::GatewayRpcError))
Err(RpcError::new2(MediaServerError::NodeTimeout))

Check warning on line 319 in bin/src/server/gateway/local_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway/local_rpc_handler.rs#L319

Added line #L319 was not covered by tests
}
} else {
self.feedback_route_error(session_id, now_ms() - started_at, None, ErrorType::PoolEmpty).await;
Expand All @@ -325,22 +325,16 @@
}

async fn webrtc_remote_ice(&self, conn_part: Option<(NodeId, u64)>, conn: ClusterConnId, param: RemoteIceRequest) -> RpcResult<RemoteIceResponse> {
if let Some((node, _session)) = conn_part {
let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRemoteIceRequest {
conn: conn.to_string(),
candidates: param.candidates,
};
log::info!("[Gateway] selected node {node}");
let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT);
let res = self.client.webrtc_remote_ice(sock_addr, rpc_req).await;
if let Some(res) = res {
Ok(RemoteIceResponse { added: res.added })
} else {
Err(RpcError::new2(MediaServerError::GatewayRpcError))
}
} else {
Err(RpcError::new2(MediaServerError::InvalidConnId))
}
let (node, _session) = conn_part.ok_or(RpcError::new2(MediaServerError::InvalidConnId))?;
let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRemoteIceRequest {
conn: conn.to_string(),
candidates: param.candidates,
};
log::info!("[Gateway] selected node {node}");
let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT);
let res = self.client.webrtc_remote_ice(sock_addr, rpc_req).await;
let res = res.ok_or(RpcError::new2(MediaServerError::GatewayRpcError))?;
Ok(RemoteIceResponse { added: res.added })

Check warning on line 337 in bin/src/server/gateway/local_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway/local_rpc_handler.rs#L328-L337

Added lines #L328 - L337 were not covered by tests
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -354,31 +348,34 @@
extra_data: Option<String>,
record: bool,
) -> RpcResult<(ClusterConnId, ConnectResponse)> {
//TODO how to handle media-node down?
if let Some((node, _session)) = conn_part {
let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRestartIceRequest {
conn: conn.to_string(),
ip: ip.to_string(),
user_agent,
req: Some(req),
record,
extra_data,
};
log::info!("[Gateway] selected node {node}");
let sock_addr = node_vnet_addr(node, GATEWAY_RPC_PORT);
let res = self.client.webrtc_restart_ice(sock_addr, rpc_req).await;
if let Some(res) = res {
if let Some(res) = res.res {
Ok((res.conn_id.parse().unwrap(), res))
} else {
Err(RpcError::new2(MediaServerError::MediaResError))
let (node, _session) = conn_part.ok_or(RpcError::new2(MediaServerError::InvalidConnId))?;
let dest = match self.selector.dest_for(ServiceKind::Webrtc, node).await {
Some(dest) => dest,
None => match self.selector.select(ServiceKind::Webrtc, self.ip2location.get_location(&ip)).await {
Some(dest) => {
log::warn!("[Gateway] not found dest {node} found other node {dest} for restart-ice (reconnect to other server)");
dest

Check warning on line 357 in bin/src/server/gateway/local_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway/local_rpc_handler.rs#L351-L357

Added lines #L351 - L357 were not covered by tests
}
} else {
Err(RpcError::new2(MediaServerError::GatewayRpcError))
}
} else {
Err(RpcError::new2(MediaServerError::InvalidConnId))
}
None => {
log::warn!("[Gateway] node pool empty for restart-ice to dest {node}");
return RpcResult::Err(RpcError::new2(MediaServerError::NodePoolEmpty));

Check warning on line 361 in bin/src/server/gateway/local_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway/local_rpc_handler.rs#L360-L361

Added lines #L360 - L361 were not covered by tests
}
},
};
log::info!("[Gateway] selected dest node {dest} with provided node {node}");
let rpc_req = media_server_protocol::protobuf::cluster_gateway::WebrtcRestartIceRequest {
conn: conn.to_string(),
ip: ip.to_string(),
user_agent,
req: Some(req),
record,
extra_data,
};
let sock_addr = node_vnet_addr(dest, GATEWAY_RPC_PORT);
let res = self.client.webrtc_restart_ice(sock_addr, rpc_req).await;
let res = res.ok_or(RpcError::new2(MediaServerError::GatewayRpcError))?;
let res = res.res.ok_or(RpcError::new2(MediaServerError::MediaResError))?;
Ok((res.conn_id.parse().unwrap(), res))

Check warning on line 378 in bin/src/server/gateway/local_rpc_handler.rs

View check run for this annotation

Codecov / codecov/patch

bin/src/server/gateway/local_rpc_handler.rs#L365-L378

Added lines #L365 - L378 were not covered by tests
}

/*
Expand Down
1 change: 1 addition & 0 deletions packages/media_gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ edition = "2021"
log = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
media-server-protocol = { path = "../protocol" }
media-server-utils = { path = "../media_utils" }
atm0s-sdn = { workspace = true }
prost = { workspace = true }
16 changes: 13 additions & 3 deletions packages/media_gateway/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use atm0s_sdn::NodeId;
use media_server_protocol::protobuf::cluster_gateway::ping_event::{gateway_origin::Location, GatewayOrigin, Origin, ServiceStats};

use crate::{NodeMetrics, ServiceKind};
Expand Down Expand Up @@ -32,8 +33,8 @@
pub fn new(zone: u32, location: Location, max_cpu: u8, max_memory: u8, max_disk: u8) -> Self {
Self {
node: NodeMetrics::default(),
webrtc: ServiceStore::new(ServiceKind::Webrtc, location),
rtpengine: ServiceStore::new(ServiceKind::RtpEngine, location),
webrtc: ServiceStore::new(zone, ServiceKind::Webrtc, location),
rtpengine: ServiceStore::new(zone, ServiceKind::RtpEngine, location),
zone,
location,
output: None,
Expand Down Expand Up @@ -101,7 +102,7 @@
}
}

pub fn best_for(&self, kind: ServiceKind, location: Option<Location>) -> Option<u32> {
pub fn best_for(&self, kind: ServiceKind, location: Option<Location>) -> Option<NodeId> {
let node = match kind {
ServiceKind::Webrtc => self.webrtc.best_for(location),
ServiceKind::RtpEngine => self.rtpengine.best_for(location),
Expand All @@ -110,6 +111,15 @@
node
}

pub fn dest_for(&self, kind: ServiceKind, dest: NodeId) -> Option<NodeId> {
let node = match kind {
ServiceKind::Webrtc => self.webrtc.dest_for(dest),
ServiceKind::RtpEngine => self.rtpengine.dest_for(dest),

Check warning on line 117 in packages/media_gateway/src/store.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_gateway/src/store.rs#L114-L117

Added lines #L114 - L117 were not covered by tests
};
log::debug!("[GatewayStore] query dest {:?} for node {} got {:?}", kind, dest, node);
node
}

Check warning on line 121 in packages/media_gateway/src/store.rs

View check run for this annotation

Codecov / codecov/patch

packages/media_gateway/src/store.rs#L119-L121

Added lines #L119 - L121 were not covered by tests

pub fn local_stats(&self) -> Option<ServiceStats> {
self.webrtc.local_stats()
}
Expand Down
Loading
Loading