Skip to content

Commit

Permalink
feat: node info endpoint (8xFF#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
luongngocminh authored Jan 5, 2024
1 parent da6906b commit fdf1dba
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 15 deletions.
2 changes: 2 additions & 0 deletions packages/cluster/src/define/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::ops::Deref;
use atm0s_sdn::NodeAddr;

mod endpoint;
mod local_track;
Expand Down Expand Up @@ -56,6 +57,7 @@ where
C: ClusterEndpoint,
{
fn node_id(&self) -> u32;
fn node_addr(&self) -> NodeAddr;
fn build(&mut self, room_id: &str, peer_id: &str) -> C;
}

Expand Down
16 changes: 16 additions & 0 deletions packages/cluster/src/define/rpc/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,19 @@ pub enum MediaSessionProtocol {
Rtmp,
Sip,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct NodeInfo {
pub node_id: u32,
pub address: String,
pub server_type: ServerType,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub enum ServerType {
GATEWAY,
CONNECTOR,
SIP,
WEBRTC,
RTMP,
}
6 changes: 6 additions & 0 deletions packages/cluster/src/implement/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ compose_transport!(UdpTcpTransport, udp: UdpTransport, tcp: TcpTransport);

pub struct ServerSdn {
node_id: NodeId,
node_addr: NodeAddr,
join_handler: Option<async_std::task::JoinHandle<()>>,
pubsub_sdk: PubsubSdk,
kv_sdk: KeyValueSdk,
Expand Down Expand Up @@ -99,6 +100,7 @@ impl ServerSdn {
(
Self {
node_id,
node_addr: node_addr_builder.addr(),
pubsub_sdk,
kv_sdk,
join_handler: Some(join_handler),
Expand All @@ -114,6 +116,10 @@ impl Cluster<endpoint::ClusterEndpointSdn> for ServerSdn {
self.node_id
}

fn node_addr(&self) -> NodeAddr {
self.node_addr.clone()
}

fn build(&mut self, room_id: &str, peer_id: &str) -> endpoint::ClusterEndpointSdn {
endpoint::ClusterEndpointSdn::new(room_id, peer_id, self.pubsub_sdk.clone(), self.kv_sdk.clone(), self.rpc_emitter.clone())
}
Expand Down
19 changes: 15 additions & 4 deletions servers/media-server/src/server/connector.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use clap::Parser;
use cluster::{
rpc::{connector::MediaEndpointLogResponse, RpcEmitter, RpcEndpoint, RpcRequest},
rpc::{
connector::MediaEndpointLogResponse,
general::{NodeInfo, ServerType},
RpcEmitter, RpcEndpoint, RpcRequest,
},
Cluster, ClusterEndpoint,
};
use futures::{select, FutureExt};
use metrics_dashboard::build_dashboard_route;
use poem::Route;
use poem::{web::Json, Route};
use poem_openapi::OpenApiService;
use protocol::media_event_logs::MediaEndpointLogRequest;

Expand All @@ -15,7 +19,7 @@ mod rpc;
mod transports;

use self::{
rpc::{cluster::ConnectorClusterRpc, http::ConnectorHttpApis, RpcEvent, InternalControl},
rpc::{cluster::ConnectorClusterRpc, http::ConnectorHttpApis, InternalControl, RpcEvent},
transports::nats::NatsTransporter,
transports::{parse_uri, ConnectorTransporter},
};
Expand All @@ -39,7 +43,7 @@ pub struct ConnectorArgs {
pub max_conn: u64,
}

pub async fn run_connector_server<C, CR, RPC, REQ, EMITTER>(http_port: u16, _opts: ConnectorArgs, ctx: MediaServerContext<InternalControl>, _cluster: C, rpc_endpoint: RPC) -> Result<(), &'static str>
pub async fn run_connector_server<C, CR, RPC, REQ, EMITTER>(http_port: u16, _opts: ConnectorArgs, ctx: MediaServerContext<InternalControl>, cluster: C, rpc_endpoint: RPC) -> Result<(), &'static str>
where
C: Cluster<CR> + Send + 'static,
CR: ClusterEndpoint + Send + 'static,
Expand Down Expand Up @@ -70,6 +74,12 @@ where
}
};

let node_info = NodeInfo {
node_id: cluster.node_id(),
address: format!("{}", cluster.node_addr()),
server_type: ServerType::CONNECTOR,
};

let api_service = OpenApiService::new(ConnectorHttpApis, "Connector Server", "1.0.0").server(format!("http://localhost:{}", http_port));
let ui = api_service.swagger_ui();
let spec = api_service.spec();
Expand All @@ -78,6 +88,7 @@ where
.nest("/", api_service)
.nest("/dashboard/", build_dashboard_route())
.nest("/ui/", ui)
.at("/node-info/", poem::endpoint::make_sync(move |_| Json(node_info.clone())))
.at("/spec/", poem::endpoint::make_sync(move |_| spec.clone()));

http_server.start(route, ctx).await;
Expand Down
13 changes: 9 additions & 4 deletions servers/media-server/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ use async_std::stream::StreamExt;
use clap::Parser;
use cluster::{
rpc::{
general::{MediaEndpointCloseRequest, MediaEndpointCloseResponse},
general::{MediaEndpointCloseRequest, MediaEndpointCloseResponse, NodeInfo, ServerType},
webrtc::{WebrtcPatchRequest, WebrtcPatchResponse, WebrtcRemoteIceRequest, WebrtcRemoteIceResponse},
RpcEmitter, RpcEndpoint, RpcRequest, RPC_MEDIA_ENDPOINT_CLOSE, RPC_WEBRTC_CONNECT, RPC_WEBRTC_ICE, RPC_WEBRTC_PATCH, RPC_WHEP_CONNECT, RPC_WHIP_CONNECT,
},
Cluster, ClusterEndpoint, MEDIA_SERVER_SERVICE,
Cluster, ClusterEndpoint, INNER_GATEWAY_SERVICE, MEDIA_SERVER_SERVICE,
};
use futures::{select, FutureExt};
use media_utils::{SystemTimer, Timer};
use metrics::describe_counter;
use metrics_dashboard::build_dashboard_route;
use poem::Route;
use poem::{web::Json, Route};
use poem_openapi::OpenApiService;

use crate::rpc::http::HttpRpcServer;
Expand Down Expand Up @@ -67,7 +67,11 @@ where
let api_service = OpenApiService::new(GatewayHttpApis, "Gateway Server", "1.0.0").server("http://localhost:3000");
let ui = api_service.swagger_ui();
let spec = api_service.spec();

let node_info = NodeInfo {
node_id: cluster.node_id(),
address: format!("{}", cluster.node_addr()),
server_type: ServerType::GATEWAY,
};
#[cfg(feature = "embed-samples")]
let samples = EmbeddedFilesEndpoint::<Files>::new(Some("index.html".to_string()));
#[cfg(not(feature = "embed-samples"))]
Expand All @@ -76,6 +80,7 @@ where
.nest("/", api_service)
.nest("/dashboard/", build_dashboard_route())
.nest("/ui/", ui)
.at("/node-info/", poem::endpoint::make_sync(move |_| Json(node_info.clone())))
.at("/spec/", poem::endpoint::make_sync(move |_| spec.clone()))
.nest("/samples", samples);

Expand Down
12 changes: 9 additions & 3 deletions servers/media-server/src/server/rtmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use clap::Parser;
use cluster::{
rpc::{
gateway::{NodeHealthcheckResponse, NodePing, NodePong, ServiceInfo},
general::{MediaEndpointCloseResponse, MediaSessionProtocol},
general::{MediaEndpointCloseResponse, MediaSessionProtocol, NodeInfo, ServerType},
RpcEmitter, RpcEndpoint, RpcRequest, RPC_NODE_PING,
},
Cluster, ClusterEndpoint, INNER_GATEWAY_SERVICE,
Cluster, ClusterEndpoint, INNER_GATEWAY_SERVICE, MEDIA_SERVER_SERVICE,
};
use futures::{select, FutureExt};
use media_utils::{AutoCancelTask, ErrorDebugger};
use metrics_dashboard::build_dashboard_route;
use poem::Route;
use poem::{web::Json, Route};
use poem_openapi::OpenApiService;

use crate::rpc::http::HttpRpcServer;
Expand Down Expand Up @@ -75,6 +75,11 @@ where
let api_service = OpenApiService::new(RtmpHttpApis, "Rtmp Server", "1.0.0").server("http://localhost:3000");
let ui = api_service.swagger_ui();
let spec = api_service.spec();
let node_info = NodeInfo {
node_id: cluster.node_id(),
address: format!("{}", cluster.node_addr()),
server_type: ServerType::RTMP,
};

#[cfg(feature = "embed-samples")]
let samples = EmbeddedFilesEndpoint::<Files>::new(Some("index.html".to_string()));
Expand All @@ -84,6 +89,7 @@ where
.nest("/", api_service)
.nest("/dashboard/", build_dashboard_route())
.nest("/ui/", ui)
.at("/node-info/", poem::endpoint::make_sync(move |_| Json(node_info.clone())))
.at("/spec/", poem::endpoint::make_sync(move |_| spec.clone()))
.nest("/samples", samples);

Expand Down
13 changes: 9 additions & 4 deletions servers/media-server/src/server/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ use clap::Parser;
use cluster::{
rpc::{
gateway::{NodeHealthcheckResponse, NodePing, NodePong, ServiceInfo},
general::MediaEndpointCloseResponse,
general::{MediaEndpointCloseResponse, NodeInfo, ServerType},
webrtc::{WebrtcConnectRequestSender, WebrtcConnectResponse, WebrtcPatchRequest, WebrtcPatchResponse, WebrtcRemoteIceRequest, WebrtcRemoteIceResponse},
whep::WhepConnectResponse,
whip::WhipConnectResponse,
RpcEmitter, RpcEndpoint, RpcReqRes, RpcRequest, RPC_NODE_PING,
},
Cluster, ClusterEndpoint, EndpointSubscribeScope, MixMinusAudioMode, RemoteBitrateControlMode, VerifyObject, INNER_GATEWAY_SERVICE,
Cluster, ClusterEndpoint, EndpointSubscribeScope, MixMinusAudioMode, RemoteBitrateControlMode, VerifyObject, INNER_GATEWAY_SERVICE, MEDIA_SERVER_SERVICE,
};
use endpoint::BitrateLimiterType;
use futures::{select, FutureExt};
use media_utils::{AutoCancelTask, ErrorDebugger, StringCompression, SystemTimer, Timer};
use metrics_dashboard::build_dashboard_route;
use poem::Route;
use poem::{web::Json, Route};
use poem_openapi::OpenApiService;
use transport_webrtc::{SdkTransportLifeCycle, SdpBoxRewriteScope, WhepTransportLifeCycle, WhipTransportLifeCycle};

Expand Down Expand Up @@ -69,7 +69,11 @@ where
let timer = SystemTimer();
let mut rpc_endpoint = WebrtcClusterRpc::new(rpc_endpoint);
let mut http_server: HttpRpcServer<RpcEvent> = crate::rpc::http::HttpRpcServer::new(http_port);

let node_info = NodeInfo {
node_id: cluster.node_id(),
address: format!("{}", cluster.node_addr()),
server_type: ServerType::WEBRTC,
};
let api_service = OpenApiService::new(WebrtcHttpApis, "Webrtc Server", "1.0.0").server("/");
let ui = api_service.swagger_ui();
let spec = api_service.spec();
Expand All @@ -82,6 +86,7 @@ where
.nest("/", api_service)
.nest("/dashboard/", build_dashboard_route())
.nest("/ui/", ui)
.at("/node-info/", poem::endpoint::make_sync(move |_| Json(node_info.clone())))
.at("/spec/", poem::endpoint::make_sync(move |_| spec.clone()))
.nest("/samples", samples);

Expand Down

0 comments on commit fdf1dba

Please sign in to comment.