From f6f5b7d357accacf6f7efa70c74adce1a5f478f3 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Fri, 14 Apr 2023 10:25:05 -0400 Subject: [PATCH] Refactor the relationship between the assorted web / websocket servers (#1844) * Refactor the relationship between the servers and handles * Rename RemoteViewerServer to WebViewerSink * CLI arguments for specifying ports * Proper typing for the ports --- Cargo.lock | 5 +- crates/re_viewer/src/remote_viewer_app.rs | 2 +- crates/re_viewer/src/web.rs | 2 +- crates/re_web_viewer_server/Cargo.toml | 2 +- crates/re_web_viewer_server/src/lib.rs | 118 +++++++++++++++-- crates/re_web_viewer_server/src/main.rs | 3 +- crates/re_ws_comms/Cargo.toml | 1 + crates/re_ws_comms/src/client.rs | 3 +- crates/re_ws_comms/src/lib.rs | 69 ++++++++-- crates/re_ws_comms/src/server.rs | 90 ++++++++++--- crates/rerun/src/clap.rs | 11 +- crates/rerun/src/run.rs | 46 +++++-- crates/rerun/src/web_viewer.rs | 146 +++++++++------------- examples/python/notebook/cube.ipynb | 2 +- rerun_py/Cargo.toml | 4 +- rerun_py/rerun_sdk/rerun/__init__.py | 15 ++- rerun_py/src/python_bridge.rs | 40 ++++-- rerun_py/src/python_session.rs | 31 +++-- 18 files changed, 417 insertions(+), 173 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7fc5d9814f11..24bf0e2dd7f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4216,7 +4216,6 @@ dependencies = [ name = "re_web_viewer_server" version = "0.4.0" dependencies = [ - "anyhow", "cargo_metadata", "ctrlc", "document-features", @@ -4226,6 +4225,7 @@ dependencies = [ "re_analytics", "re_build_web_viewer", "re_log", + "thiserror", "tokio", ] @@ -4243,6 +4243,7 @@ dependencies = [ "re_log", "re_log_types", "re_smart_channel", + "thiserror", "tokio", "tokio-tungstenite", "tungstenite", @@ -4356,6 +4357,8 @@ dependencies = [ "re_log_encoding", "re_log_types", "re_memory", + "re_web_viewer_server", + "re_ws_comms", "rerun", "thiserror", "tokio", diff --git a/crates/re_viewer/src/remote_viewer_app.rs b/crates/re_viewer/src/remote_viewer_app.rs index bcd6f37e609d..2012512f4f2f 100644 --- a/crates/re_viewer/src/remote_viewer_app.rs +++ b/crates/re_viewer/src/remote_viewer_app.rs @@ -56,7 +56,7 @@ impl RemoteViewerApp { } } Err(err) => { - re_log::error!("Failed to parse message: {}", re_error::format(&err)); + re_log::error!("Failed to parse message: {err}"); std::ops::ControlFlow::Break(()) } } diff --git a/crates/re_viewer/src/web.rs b/crates/re_viewer/src/web.rs index ca1008c8ce9e..58dcec080ce2 100644 --- a/crates/re_viewer/src/web.rs +++ b/crates/re_viewer/src/web.rs @@ -148,7 +148,7 @@ fn get_url(info: &eframe::IntegrationInfo) -> String { url = param.clone(); } if url.is_empty() { - re_ws_comms::default_server_url(&info.web_info.location.hostname) + re_ws_comms::server_url(&info.web_info.location.hostname, Default::default()) } else { url } diff --git a/crates/re_web_viewer_server/Cargo.toml b/crates/re_web_viewer_server/Cargo.toml index 3631ce230645..2c519db466fe 100644 --- a/crates/re_web_viewer_server/Cargo.toml +++ b/crates/re_web_viewer_server/Cargo.toml @@ -46,11 +46,11 @@ analytics = ["dep:re_analytics"] [dependencies] re_log.workspace = true -anyhow.workspace = true ctrlc.workspace = true document-features = "0.2" futures-util = "0.3" hyper = { version = "0.14", features = ["full"] } +thiserror.workspace = true tokio = { workspace = true, default-features = false, features = [ "macros", "rt-multi-thread", diff --git a/crates/re_web_viewer_server/src/lib.rs b/crates/re_web_viewer_server/src/lib.rs index 7e97775a02a7..4d8f29172588 100644 --- a/crates/re_web_viewer_server/src/lib.rs +++ b/crates/re_web_viewer_server/src/lib.rs @@ -7,11 +7,17 @@ #![forbid(unsafe_code)] #![warn(clippy::all, rust_2018_idioms)] -use std::task::{Context, Poll}; +use std::{ + fmt::Display, + str::FromStr, + task::{Context, Poll}, +}; use futures_util::future; use hyper::{server::conn::AddrIncoming, service::Service, Body, Request, Response}; +pub const DEFAULT_WEB_VIEWER_SERVER_PORT: u16 = 9090; + #[cfg(not(feature = "__ci"))] mod data { // If you add/remove/change the paths here, also update the include-list in `Cargo.toml`! @@ -32,6 +38,21 @@ mod data { pub const VIEWER_WASM_RELEASE: &[u8] = include_bytes!("../web_viewer/re_viewer_bg.wasm"); } +#[derive(thiserror::Error, Debug)] +pub enum WebViewerServerError { + #[error("Could not parse address: {0}")] + AddrParseFailed(#[from] std::net::AddrParseError), + + #[error("failed to bind to port {0}: {1}")] + BindFailed(WebViewerServerPort, hyper::Error), + + #[error("failed to join web viewer server task: {0}")] + JoinError(#[from] tokio::task::JoinError), + + #[error("failed to serve web viewer: {0}")] + ServeFailed(hyper::Error), +} + struct Svc { // NOTE: Optional because it is possible to have the `analytics` feature flag enabled // while at the same time opting-out of analytics at run-time. @@ -149,27 +170,108 @@ impl Service for MakeSvc { // ---------------------------------------------------------------------------- -/// Hosts the Web Viewer Wasm+HTML +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +/// Typed port for use with [`WebViewerServer`] +pub struct WebViewerServerPort(pub u16); + +impl Default for WebViewerServerPort { + fn default() -> Self { + Self(DEFAULT_WEB_VIEWER_SERVER_PORT) + } +} + +impl Display for WebViewerServerPort { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +// Needed for clap +impl FromStr for WebViewerServerPort { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.parse::() { + Ok(port) => Ok(WebViewerServerPort(port)), + Err(err) => Err(format!("Failed to parse port: {err}")), + } + } +} + +/// HTTP host for the Rerun Web Viewer application +/// This serves the HTTP+Wasm+JS files that make up the web-viewer. pub struct WebViewerServer { server: hyper::Server, } impl WebViewerServer { - pub fn new(port: u16) -> Self { - let bind_addr = format!("0.0.0.0:{port}").parse().unwrap(); - let server = hyper::Server::bind(&bind_addr).serve(MakeSvc); - Self { server } + /// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port. + /// + /// A port of 0 will let the OS choose a free port. + pub fn new(port: WebViewerServerPort) -> Result { + let bind_addr = format!("0.0.0.0:{port}").parse()?; + let server = hyper::Server::try_bind(&bind_addr) + .map_err(|e| WebViewerServerError::BindFailed(port, e))? + .serve(MakeSvc); + Ok(Self { server }) } pub async fn serve( self, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, - ) -> anyhow::Result<()> { + ) -> Result<(), WebViewerServerError> { self.server .with_graceful_shutdown(async { shutdown_rx.recv().await.ok(); }) - .await?; + .await + .map_err(WebViewerServerError::ServeFailed)?; Ok(()) } + + pub fn port(&self) -> WebViewerServerPort { + WebViewerServerPort(self.server.local_addr().port()) + } +} + +/// Sync handle for the [`WebViewerServer`] +/// +/// When dropped, the server will be shut down. +pub struct WebViewerServerHandle { + port: WebViewerServerPort, + shutdown_tx: tokio::sync::broadcast::Sender<()>, +} + +impl Drop for WebViewerServerHandle { + fn drop(&mut self) { + re_log::info!("Shutting down web server on port {}.", self.port); + self.shutdown_tx.send(()).ok(); + } +} + +impl WebViewerServerHandle { + /// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port. + /// Returns a [`WebViewerServerHandle`] that will shutdown the server when dropped. + /// + /// A port of 0 will let the OS choose a free port. + /// + /// The caller needs to ensure that there is a `tokio` runtime running. + pub fn new(requested_port: WebViewerServerPort) -> Result { + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); + + let web_server = WebViewerServer::new(requested_port)?; + + let port = web_server.port(); + + tokio::spawn(async move { web_server.serve(shutdown_rx).await }); + + re_log::info!("Started web server on port {}.", port); + + Ok(Self { port, shutdown_tx }) + } + + /// Get the port where the HTTP server is listening + pub fn port(&self) -> WebViewerServerPort { + self.port + } } diff --git a/crates/re_web_viewer_server/src/main.rs b/crates/re_web_viewer_server/src/main.rs index 84bb08a376d6..df2c41d3a2a9 100644 --- a/crates/re_web_viewer_server/src/main.rs +++ b/crates/re_web_viewer_server/src/main.rs @@ -4,7 +4,7 @@ #[tokio::main] async fn main() { re_log::setup_native_logging(); - let port = 9090; + let port = Default::default(); eprintln!("Hosting web-viewer on http://127.0.0.1:{port}"); // Shutdown server via Ctrl+C @@ -16,6 +16,7 @@ async fn main() { .expect("Error setting Ctrl-C handler"); re_web_viewer_server::WebViewerServer::new(port) + .expect("Could not create web server") .serve(shutdown_rx) .await .unwrap(); diff --git a/crates/re_ws_comms/Cargo.toml b/crates/re_ws_comms/Cargo.toml index 55c22735d041..3713fb0cce8a 100644 --- a/crates/re_ws_comms/Cargo.toml +++ b/crates/re_ws_comms/Cargo.toml @@ -45,6 +45,7 @@ re_log_types = { workspace = true, features = ["serde"] } anyhow.workspace = true bincode = "1.3" document-features = "0.2" +thiserror.workspace = true # Client: ewebsock = { version = "0.2", optional = true } diff --git a/crates/re_ws_comms/src/client.rs b/crates/re_ws_comms/src/client.rs index 6faaa54dea3d..4c72dfa540a4 100644 --- a/crates/re_ws_comms/src/client.rs +++ b/crates/re_ws_comms/src/client.rs @@ -2,7 +2,8 @@ use std::ops::ControlFlow; use ewebsock::{WsEvent, WsMessage, WsSender}; -use crate::Result; +// TODO(jleibs): use thiserror +pub type Result = anyhow::Result; /// Represents a connection to the server. /// Disconnects on drop. diff --git a/crates/re_ws_comms/src/lib.rs b/crates/re_ws_comms/src/lib.rs index 41c4d77d2de4..e38527f04dc4 100644 --- a/crates/re_ws_comms/src/lib.rs +++ b/crates/re_ws_comms/src/lib.rs @@ -6,18 +6,18 @@ #[cfg(feature = "client")] mod client; +use std::{fmt::Display, str::FromStr}; + #[cfg(feature = "client")] pub use client::Connection; #[cfg(feature = "server")] mod server; #[cfg(feature = "server")] -pub use server::Server; +pub use server::{RerunServer, RerunServerHandle}; use re_log_types::LogMsg; -pub type Result = anyhow::Result; - pub const DEFAULT_WS_SERVER_PORT: u16 = 9877; #[cfg(feature = "tls")] @@ -26,8 +26,58 @@ pub const PROTOCOL: &str = "wss"; #[cfg(not(feature = "tls"))] pub const PROTOCOL: &str = "ws"; -pub fn default_server_url(hostname: &str) -> String { - format!("{PROTOCOL}://{hostname}:{DEFAULT_WS_SERVER_PORT}") +// ---------------------------------------------------------------------------- + +#[derive(thiserror::Error, Debug)] +pub enum RerunServerError { + #[error("failed to bind to port {0}: {1}")] + BindFailed(RerunServerPort, std::io::Error), + + #[error("received an invalid message")] + InvalidMessagePrefix, + + #[error("received an invalid message")] + InvalidMessage(#[from] bincode::Error), + + #[cfg(feature = "server")] + #[error("failed to join web viewer server task: {0}")] + JoinError(#[from] tokio::task::JoinError), + + #[cfg(feature = "server")] + #[error("tokio error: {0}")] + TokioIoError(#[from] tokio::io::Error), +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +/// Typed port for use with [`RerunServer`] +pub struct RerunServerPort(pub u16); + +impl Default for RerunServerPort { + fn default() -> Self { + Self(DEFAULT_WS_SERVER_PORT) + } +} + +impl Display for RerunServerPort { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +// Needed for clap +impl FromStr for RerunServerPort { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.parse::() { + Ok(port) => Ok(RerunServerPort(port)), + Err(err) => Err(format!("Failed to parse port: {err}")), + } + } +} + +pub fn server_url(hostname: &str, port: RerunServerPort) -> String { + format!("{PROTOCOL}://{hostname}:{port}") } const PREFIX: [u8; 4] = *b"RR00"; @@ -41,14 +91,11 @@ pub fn encode_log_msg(log_msg: &LogMsg) -> Vec { bytes } -pub fn decode_log_msg(data: &[u8]) -> Result { +pub fn decode_log_msg(data: &[u8]) -> Result { let payload = data .strip_prefix(&PREFIX) - .ok_or_else(|| anyhow::format_err!("Message didn't start with the correct prefix"))?; + .ok_or(RerunServerError::InvalidMessagePrefix)?; - use anyhow::Context as _; use bincode::Options as _; - bincode::DefaultOptions::new() - .deserialize(payload) - .context("bincode") + Ok(bincode::DefaultOptions::new().deserialize(payload)?) } diff --git a/crates/re_ws_comms/src/server.rs b/crates/re_ws_comms/src/server.rs index 6830cd5d57a1..d6fde09d2851 100644 --- a/crates/re_ws_comms/src/server.rs +++ b/crates/re_ws_comms/src/server.rs @@ -16,28 +16,34 @@ use tokio_tungstenite::{accept_async, tungstenite::Error}; use re_log_types::LogMsg; use re_smart_channel::Receiver; -// ---------------------------------------------------------------------------- +use crate::{server_url, RerunServerError, RerunServerPort}; -pub struct Server { +/// Websocket host for relaying [`LogMsg`]s to a web viewer. +pub struct RerunServer { listener: TcpListener, + port: RerunServerPort, } -impl Server { - /// Start a pub-sub server listening on the given port - pub async fn new(port: u16) -> anyhow::Result { - use anyhow::Context as _; - +impl RerunServer { + /// Create new [`RerunServer`] to relay [`LogMsg`]s to a websocket. + /// The websocket will be available at `port`. + /// + /// A port of 0 will let the OS choose a free port. + pub async fn new(port: RerunServerPort) -> Result { let bind_addr = format!("0.0.0.0:{port}"); let listener = TcpListener::bind(&bind_addr) .await - .with_context(|| format!("Can't listen on {bind_addr:?}"))?; + .map_err(|e| RerunServerError::BindFailed(port, e))?; + + let port = RerunServerPort(listener.local_addr()?.port()); re_log::info!( - "Listening for websocket traffic on {bind_addr}. Connect with a web Rerun Viewer." + "Listening for websocket traffic on {}. Connect with a Rerun Web Viewer.", + listener.local_addr()? ); - Ok(Self { listener }) + Ok(Self { listener, port }) } /// Accept new connections until we get a message on `shutdown_rx` @@ -45,9 +51,7 @@ impl Server { self, rx: Receiver, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, - ) -> anyhow::Result<()> { - use anyhow::Context as _; - + ) -> Result<(), RerunServerError> { let history = Arc::new(Mutex::new(Vec::new())); let log_stream = to_broadcast_stream(rx, history.clone()); @@ -60,9 +64,7 @@ impl Server { } }; - let peer = tcp_stream - .peer_addr() - .context("connected streams should have a peer address")?; + let peer = tcp_stream.peer_addr()?; tokio::spawn(accept_connection( log_stream.clone(), peer, @@ -71,6 +73,62 @@ impl Server { )); } } + + pub fn server_url(&self) -> String { + server_url("localhost", self.port) + } +} + +/// Sync handle for the [`RerunServer`] +/// +/// When dropped, the server will be shut down. +pub struct RerunServerHandle { + port: RerunServerPort, + shutdown_tx: tokio::sync::broadcast::Sender<()>, +} + +impl Drop for RerunServerHandle { + fn drop(&mut self) { + re_log::info!("Shutting down Rerun server on port {}.", self.port); + self.shutdown_tx.send(()).ok(); + } +} + +impl RerunServerHandle { + /// Create new [`RerunServer`] to relay [`LogMsg`]s to a websocket. + /// Returns a [`RerunServerHandle`] that will shutdown the server when dropped. + /// + /// A port of 0 will let the OS choose a free port. + /// + /// The caller needs to ensure that there is a `tokio` runtime running. + pub fn new( + rerun_rx: Receiver, + requested_port: RerunServerPort, + ) -> Result { + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); + + let rt = tokio::runtime::Handle::current(); + + let ws_server = rt.block_on(tokio::spawn(async move { + let ws_server = RerunServer::new(requested_port).await; + ws_server + }))??; + + let port = ws_server.port; + + tokio::spawn(async move { ws_server.listen(rerun_rx, shutdown_rx).await }); + + Ok(Self { port, shutdown_tx }) + } + + /// Get the port where the websocket server is listening + pub fn port(&self) -> RerunServerPort { + self.port + } + + pub fn server_url(&self) -> String { + server_url("localhost", self.port) + } } fn to_broadcast_stream( diff --git a/crates/rerun/src/clap.rs b/crates/rerun/src/clap.rs index 29ba016d81ca..fddb1de26aa6 100644 --- a/crates/rerun/src/clap.rs +++ b/crates/rerun/src/clap.rs @@ -2,6 +2,11 @@ use std::{net::SocketAddr, path::PathBuf}; +#[cfg(feature = "web_viewer")] +use re_web_viewer_server::WebViewerServerPort; +#[cfg(feature = "web_viewer")] +use re_ws_comms::RerunServerPort; + use crate::Session; // --- @@ -103,7 +108,11 @@ impl RerunArgs { #[cfg(feature = "web_viewer")] RerunBehavior::Serve => { let open_browser = true; - crate::web_viewer::new_sink(open_browser) + crate::web_viewer::new_sink( + open_browser, + WebViewerServerPort::default(), + RerunServerPort::default(), + )? } #[cfg(feature = "native_viewer")] diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index 0cbebd6ed75b..979c05705ff7 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -5,6 +5,10 @@ use re_smart_channel::Receiver; use anyhow::Context as _; use clap::Subcommand; +#[cfg(feature = "web_viewer")] +use re_web_viewer_server::WebViewerServerPort; +#[cfg(feature = "web_viewer")] +use re_ws_comms::RerunServerPort; #[cfg(feature = "web_viewer")] use crate::web_viewer::host_web_viewer; @@ -67,7 +71,7 @@ struct Args { #[clap(long, default_value_t = true)] persist_state: bool, - /// What TCP port do we listen to (for SDK:s to connect to)? + /// What TCP port do we listen to (for SDKs to connect to)? #[cfg(feature = "server")] #[clap(long, default_value_t = re_sdk_comms::DEFAULT_SERVER_PORT)] port: u16, @@ -106,6 +110,18 @@ struct Args { /// Requires Rerun to have been compiled with the 'web_viewer' feature. #[clap(long)] web_viewer: bool, + + /// What port do we listen to for hosting the web viewer over HTTP. + /// A port of 0 will pick a random port. + #[cfg(feature = "web_viewer")] + #[clap(long, default_value_t = Default::default())] + web_viewer_port: WebViewerServerPort, + + /// What port do we listen to for incoming websocket connections from the viewer + /// A port of 0 will pick a random port. + #[cfg(feature = "web_viewer")] + #[clap(long, default_value_t = Default::default())] + ws_server_port: RerunServerPort, } #[derive(Debug, Clone, Subcommand)] @@ -303,8 +319,14 @@ async fn run_impl( if args.web_viewer { #[cfg(feature = "web_viewer")] { - let web_viewer = - host_web_viewer(true, rerun_server_ws_url, shutdown_rx.resubscribe()); + let web_viewer = host_web_viewer( + args.web_viewer_port, + true, + rerun_server_ws_url, + shutdown_rx.resubscribe(), + ); + // We return here because the running [`WebViewerServer`] is all we need. + // The page we open will be pointed at a websocket url hosted by a *different* server. return web_viewer.await; } #[cfg(not(feature = "web_viewer"))] @@ -356,7 +378,9 @@ async fn run_impl( #[cfg(feature = "web_viewer")] { #[cfg(feature = "server")] - if args.url_or_path.is_none() && args.port == re_ws_comms::DEFAULT_WS_SERVER_PORT { + if args.url_or_path.is_none() + && (args.port == args.web_viewer_port.0 || args.port == args.ws_server_port.0) + { anyhow::bail!( "Trying to spawn a websocket server on {}, but this port is \ already used by the server we're connecting to. Please specify a different port.", @@ -369,17 +393,21 @@ async fn run_impl( let shutdown_web_viewer = shutdown_rx.resubscribe(); // This is the server which the web viewer will talk to: - let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT).await?; + let ws_server = re_ws_comms::RerunServer::new(args.ws_server_port).await?; + let ws_server_url = ws_server.server_url(); let ws_server_handle = tokio::spawn(ws_server.listen(rx, shutdown_ws_server)); - let ws_server_url = re_ws_comms::default_server_url("127.0.0.1"); // This is the server that serves the Wasm+HTML: - let web_server_handle = - tokio::spawn(host_web_viewer(true, ws_server_url, shutdown_web_viewer)); + let web_server_handle = tokio::spawn(host_web_viewer( + args.web_viewer_port, + true, + ws_server_url, + shutdown_web_viewer, + )); // Wait for both servers to shutdown. web_server_handle.await?.ok(); - return ws_server_handle.await?; + return ws_server_handle.await?.map_err(anyhow::Error::from); } #[cfg(not(feature = "web_viewer"))] diff --git a/crates/rerun/src/web_viewer.rs b/crates/rerun/src/web_viewer.rs index 981ece53f92c..bc1fc6f3396e 100644 --- a/crates/rerun/src/web_viewer.rs +++ b/crates/rerun/src/web_viewer.rs @@ -1,78 +1,78 @@ use re_log_types::LogMsg; - -/// Hosts two servers: -/// * A web-server, serving the web-viewer -/// * A `WebSocket` server, server [`LogMsg`]es to remote viewer(s). -struct RemoteViewerServer { +use re_web_viewer_server::{WebViewerServerHandle, WebViewerServerPort}; +use re_ws_comms::{RerunServerHandle, RerunServerPort}; + +/// A [`crate::sink::LogSink`] tied to a hosted Rerun web viewer. This internally stores two servers: +/// * A [`re_ws_comms::RerunServer`] to relay messages from the sink to a websocket connection +/// * A [`re_web_viewer_server::WebViewerServer`] to serve the Wasm+HTML +struct WebViewerSink { + /// Sender to send messages to the [`re_ws_comms::RerunServer`] sender: re_smart_channel::Sender, - shutdown_tx: tokio::sync::broadcast::Sender<()>, -} -impl Drop for RemoteViewerServer { - fn drop(&mut self) { - re_log::info!("Shutting down web server."); - self.shutdown_tx.send(()).ok(); - } + /// Handle to keep the [`re_ws_comms::RerunServer`] alive + _rerun_server: RerunServerHandle, + + /// Handle to keep the [`re_web_viewer_server::WebViewerServer`] alive + _webviewer_server: WebViewerServerHandle, } -impl RemoteViewerServer { - pub fn new(open_browser: bool) -> Self { +impl WebViewerSink { + pub fn new( + open_browser: bool, + web_port: WebViewerServerPort, + ws_port: RerunServerPort, + ) -> anyhow::Result { let (rerun_tx, rerun_rx) = re_smart_channel::smart_channel(re_smart_channel::Source::Sdk); - let (shutdown_tx, shutdown_rx_ws_server) = tokio::sync::broadcast::channel(1); - let shutdown_rx_web_server = shutdown_tx.subscribe(); - tokio::spawn(async move { - // This is the server which the web viewer will talk to: - let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT) - .await - .unwrap(); - let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx, shutdown_rx_ws_server)); - let ws_server_url = re_ws_comms::default_server_url("127.0.0.1"); + let rerun_server = RerunServerHandle::new(rerun_rx, ws_port)?; + let webviewer_server = WebViewerServerHandle::new(web_port)?; - // This is the server that serves the Wasm+HTML: - let web_server_handle = tokio::spawn(host_web_viewer( - open_browser, - ws_server_url, - shutdown_rx_web_server, - )); + let web_port = webviewer_server.port(); + let server_url = rerun_server.server_url(); + let viewer_url = format!("http://127.0.0.1:{web_port}?url={server_url}"); - ws_server_handle.await.unwrap().unwrap(); - web_server_handle.await.unwrap().unwrap(); - }); + re_log::info!("Web server is running - view it at {viewer_url}"); + if open_browser { + webbrowser::open(&viewer_url).ok(); + } - Self { + Ok(Self { sender: rerun_tx, - shutdown_tx, - } + _rerun_server: rerun_server, + _webviewer_server: webviewer_server, + }) } } -/// Hosts two servers: -/// * A web-server, serving the web-viewer -/// * A `WebSocket` server, server [`LogMsg`]es to remote viewer(s). +/// Async helper to spawn an instance of the [`re_web_viewer_server::WebViewerServer`]. +/// This serves the HTTP+Wasm+JS files that make up the web-viewer. /// -/// Optionally opens a browser with the web-viewer. +/// Optionally opens a browser with the web-viewer and connects to the provided `target_url`. +/// This url could be a hosted RRD file or a `ws://` url to a running [`re_ws_comms::RerunServer`]. +/// +/// Note: this does not include the websocket server. #[cfg(feature = "web_viewer")] pub async fn host_web_viewer( + web_port: WebViewerServerPort, open_browser: bool, - ws_server_url: String, + source_url: String, shutdown_rx: tokio::sync::broadcast::Receiver<()>, ) -> anyhow::Result<()> { - let web_port = 9090; - let viewer_url = format!("http://127.0.0.1:{web_port}?url={ws_server_url}"); + let web_server = re_web_viewer_server::WebViewerServer::new(web_port)?; + let port = web_server.port(); + let web_server_handle = web_server.serve(shutdown_rx); - let web_server = re_web_viewer_server::WebViewerServer::new(web_port); - let web_server_handle = tokio::spawn(web_server.serve(shutdown_rx)); + let viewer_url = format!("http://127.0.0.1:{port}?url={source_url}"); re_log::info!("Web server is running - view it at {viewer_url}"); if open_browser { webbrowser::open(&viewer_url).ok(); } - web_server_handle.await? + web_server_handle.await.map_err(anyhow::Error::msg) } -impl crate::sink::LogSink for RemoteViewerServer { +impl crate::sink::LogSink for WebViewerSink { fn send(&self, msg: LogMsg) { if let Err(err) = self.sender.send(msg) { re_log::error_once!("Failed to send log message to web server: {err}"); @@ -94,47 +94,15 @@ impl crate::sink::LogSink for RemoteViewerServer { /// This function returns immediately. /// /// The caller needs to ensure that there is a `tokio` runtime running. -#[must_use] -pub fn new_sink(open_browser: bool) -> Box { - Box::new(RemoteViewerServer::new(open_browser)) -} - -/// Hosts the rerun web assets only -pub struct HostAssets { - web_port: u16, - shutdown_tx: tokio::sync::broadcast::Sender<()>, -} - -impl Drop for HostAssets { - fn drop(&mut self) { - re_log::info!("Shutting down web server."); - self.shutdown_tx.send(()).ok(); - } -} - -impl HostAssets { - /// Create new web server hosting rerun assets on `web_port` - /// - /// The caller needs to ensure that there is a `tokio` runtime running. - #[cfg(feature = "web_viewer")] - pub fn new(web_port: u16) -> Self { - let (shutdown_tx, shutdown_rx_web_server) = tokio::sync::broadcast::channel(1); - - tokio::spawn(async move { - let web_server = re_web_viewer_server::WebViewerServer::new(web_port); - let web_server_handle = tokio::spawn(web_server.serve(shutdown_rx_web_server)); - - web_server_handle.await.unwrap().unwrap(); - }); - - Self { - web_port, - shutdown_tx, - } - } - - /// Get the port where the web assets are hosted - pub fn get_port(&self) -> u16 { - self.web_port - } +#[must_use = "the sink must be kept around to keep the servers running"] +pub fn new_sink( + open_browser: bool, + web_port: WebViewerServerPort, + ws_port: RerunServerPort, +) -> anyhow::Result> { + Ok(Box::new(WebViewerSink::new( + open_browser, + web_port, + ws_port, + )?)) } diff --git a/examples/python/notebook/cube.ipynb b/examples/python/notebook/cube.ipynb index 12d3699bfffe..38413daece43 100644 --- a/examples/python/notebook/cube.ipynb +++ b/examples/python/notebook/cube.ipynb @@ -20,7 +20,7 @@ "# Uncomment to use locally-hosted assets. Necessary for local-builds or offline setups.\n", "# Won't work for remotely hosted notebook environments.\n", "\n", - "# rr.self_host_assets()" + "# rr.start_web_viewer_server()" ] }, { diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index 422a89c466d1..9a48f157f713 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -35,7 +35,7 @@ native_viewer = ["rerun/native_viewer"] ## ## You also need to install some additional tools, which you can do by running ## [`scripts/setup_web.sh`](https://github.com/rerun-io/rerun/blob/main/scripts/setup_web.sh). -web_viewer = ["rerun/web_viewer"] +web_viewer = ["rerun/web_viewer", "dep:re_web_viewer_server", "dep:re_ws_comms"] [dependencies] @@ -50,6 +50,8 @@ rerun = { workspace = true, default-features = false, features = [ "server", "sdk", ] } +re_web_viewer_server = { workspace = true, optional = true } +re_ws_comms = { workspace = true, optional = true } arrow2 = { workspace = true, features = ["io_ipc", "io_print"] } document-features = "0.2" diff --git a/rerun_py/rerun_sdk/rerun/__init__.py b/rerun_py/rerun_sdk/rerun/__init__.py index f0e64608e01a..89554f33b6d1 100644 --- a/rerun_py/rerun_sdk/rerun/__init__.py +++ b/rerun_py/rerun_sdk/rerun/__init__.py @@ -338,7 +338,7 @@ def spawn(port: int = 9876, connect: bool = True) -> None: _spawn = spawn # we need this because Python scoping is horrible -def serve(open_browser: bool = True) -> None: +def serve(open_browser: bool = True, web_port: Optional[int] = None, ws_port: Optional[int] = None) -> None: """ Serve log-data over WebSockets and serve a Rerun web viewer over HTTP. @@ -352,31 +352,34 @@ def serve(open_browser: bool = True) -> None: ---------- open_browser Open the default browser to the viewer. - + web_port: + The port to serve the web viewer on (defaults to 9090). + ws_port: + The port to serve the WebSocket server on (defaults to 9877) """ if not bindings.is_enabled(): print("Rerun is disabled - serve() call ignored") return - bindings.serve(open_browser) + bindings.serve(open_browser, web_port, ws_port) -def self_host_assets(port: int = 9090) -> None: +def start_web_viewer_server(port: int = 0) -> None: """ Self-host the rerun web-viewer assets on a specified port. Parameters ---------- port - Port to serve assets on. Pass None to disable. (Defaults to 9090) + Port to serve assets on. Defaults to 0 (random port). """ if not bindings.is_enabled(): print("Rerun is disabled - self_host_assets() call ignored") return - bindings.self_host_assets(port) + bindings.start_web_viewer_server(port) def disconnect() -> None: diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 2acbee5d8f87..c7035761d336 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -30,6 +30,9 @@ pub use rerun::{ coordinates::{Axis3, Handedness, Sign, SignedAxis3}, }; +use re_web_viewer_server::WebViewerServerPort; +use re_ws_comms::RerunServerPort; + use crate::{arrow::get_registered_component_names, python_session::PythonSession}; // ---------------------------------------------------------------------------- @@ -142,7 +145,7 @@ fn rerun_bindings(py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(is_enabled, m)?)?; m.add_function(wrap_pyfunction!(memory_recording, m)?)?; m.add_function(wrap_pyfunction!(save, m)?)?; - m.add_function(wrap_pyfunction!(self_host_assets, m)?)?; + m.add_function(wrap_pyfunction!(start_web_viewer_server, m)?)?; m.add_function(wrap_pyfunction!(serve, m)?)?; m.add_function(wrap_pyfunction!(set_enabled, m)?)?; m.add_function(wrap_pyfunction!(shutdown, m)?)?; @@ -312,7 +315,7 @@ fn enter_tokio_runtime() -> tokio::runtime::EnterGuard<'static> { /// Serve a web-viewer. #[allow(clippy::unnecessary_wraps)] // False positive #[pyfunction] -fn serve(open_browser: bool) -> PyResult<()> { +fn serve(open_browser: bool, web_port: Option, ws_port: Option) -> PyResult<()> { #[cfg(feature = "web_viewer")] { let mut session = python_session(); @@ -324,7 +327,14 @@ fn serve(open_browser: bool) -> PyResult<()> { let _guard = enter_tokio_runtime(); - session.set_sink(rerun::web_viewer::new_sink(open_browser)); + session.set_sink( + rerun::web_viewer::new_sink( + open_browser, + web_port.map(WebViewerServerPort).unwrap_or_default(), + ws_port.map(RerunServerPort).unwrap_or_default(), + ) + .map_err(|err| PyRuntimeError::new_err(err.to_string()))?, + ); Ok(()) } @@ -339,12 +349,24 @@ fn serve(open_browser: bool) -> PyResult<()> { } #[pyfunction] -fn self_host_assets(port: Option) -> PyResult<()> { - let mut session = python_session(); - let _guard = enter_tokio_runtime(); - session - .self_host_assets(port) - .map_err(|err| PyRuntimeError::new_err(err.to_string())) +// TODO(jleibs) expose this as a python type +fn start_web_viewer_server(port: u16) -> PyResult<()> { + #[cfg(feature = "web_viewer")] + { + let mut session = python_session(); + let _guard = enter_tokio_runtime(); + session + .start_web_viewer_server(WebViewerServerPort(port)) + .map_err(|err| PyRuntimeError::new_err(err.to_string())) + } + + #[cfg(not(feature = "web_viewer"))] + { + _ = open_browser; + Err(PyRuntimeError::new_err( + "The Rerun SDK was not compiled with the 'web_viewer' feature", + )) + } } #[pyfunction] diff --git a/rerun_py/src/python_session.rs b/rerun_py/src/python_session.rs index c1c7991057d7..696857b7b8a6 100644 --- a/rerun_py/src/python_session.rs +++ b/rerun_py/src/python_session.rs @@ -6,8 +6,8 @@ use re_log_types::{ RecordingId, RecordingInfo, RecordingSource, RowId, Time, TimePoint, }; +use re_web_viewer_server::WebViewerServerPort; use rerun::sink::LogSink; - // ---------------------------------------------------------------------------- #[derive(thiserror::Error, Debug)] @@ -15,6 +15,9 @@ pub enum PythonSessionError { #[allow(dead_code)] #[error("The Rerun SDK was not compiled with the '{0}' feature")] FeatureNotEnabled(&'static str), + + #[error("Could not start the WebViewerServer: '{0}'")] + WebViewerServerError(#[from] re_web_viewer_server::WebViewerServerError), } /// Used to construct a [`RecordingInfo`]: @@ -81,7 +84,7 @@ pub struct PythonSession { /// Used to serve the web viewer assets. /// TODO(jleibs): Potentially use this for serve as well #[cfg(feature = "web_viewer")] - self_hosted_web_viewer: Option, + web_viewer_server: Option, } impl Default for PythonSession { @@ -94,7 +97,7 @@ impl Default for PythonSession { sink: Box::new(rerun::sink::BufferedSink::new()), build_info: re_build_info::build_info!(), #[cfg(feature = "web_viewer")] - self_hosted_web_viewer: None, + web_viewer_server: None, } } } @@ -312,8 +315,8 @@ impl PythonSession { /// whether `host_assets` was called. pub fn get_app_url(&self) -> String { #[cfg(feature = "web_viewer")] - if let Some(hosted_assets) = &self.self_hosted_web_viewer { - return format!("http://localhost:{}", hosted_assets.get_port()); + if let Some(hosted_assets) = &self.web_viewer_server { + return format!("http://localhost:{}", hosted_assets.port()); } let short_git_hash = &self.build_info.git_hash[..7]; @@ -324,17 +327,13 @@ impl PythonSession { /// /// The caller needs to ensure that there is a `tokio` runtime running. #[allow(clippy::unnecessary_wraps)] - pub fn self_host_assets(&mut self, _web_port: Option) -> Result<(), PythonSessionError> { - #[cfg(feature = "web_viewer")] - { - self.self_hosted_web_viewer = _web_port.map(rerun::web_viewer::HostAssets::new); - - Ok(()) - } + #[cfg(feature = "web_viewer")] + pub fn start_web_viewer_server( + &mut self, + _web_port: WebViewerServerPort, + ) -> Result<(), PythonSessionError> { + self.web_viewer_server = Some(re_web_viewer_server::WebViewerServerHandle::new(_web_port)?); - #[cfg(not(feature = "web_viewer"))] - { - Err(PythonSessionError::FeatureNotEnabled("web_viewer")) - } + Ok(()) } }