From 0a4ae966457d683e0d5760dcf17749406d471bff Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 14 Nov 2024 15:40:02 +0100 Subject: [PATCH] WebSocket server now indefinitely keeps track of non-data RPC commands --- crates/store/re_ws_comms/src/server.rs | 28 +++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/crates/store/re_ws_comms/src/server.rs b/crates/store/re_ws_comms/src/server.rs index 08c737921c3f..20d831260627 100644 --- a/crates/store/re_ws_comms/src/server.rs +++ b/crates/store/re_ws_comms/src/server.rs @@ -28,6 +28,9 @@ use crate::{server_url, RerunServerError, RerunServerPort}; struct MessageQueue { server_memory_limit: MemoryLimit, messages: VecDeque>, + + /// Never garbage collected. + messages_static: VecDeque>, } impl MessageQueue { @@ -35,6 +38,7 @@ impl MessageQueue { Self { server_memory_limit, messages: Default::default(), + messages_static: Default::default(), } } @@ -43,6 +47,15 @@ impl MessageQueue { self.messages.push_back(msg); } + /// Messages pushed using this method will stay around indefinitely. + /// + /// Useful e.g. for `SetStoreInfo` messages, so that clients late to the party actually get a + /// chance of receiving them. + pub fn push_static(&mut self, msg: Vec) { + self.gc_if_using_too_much_ram(); + self.messages_static.push_back(msg); + } + fn gc_if_using_too_much_ram(&mut self) { re_tracing::profile_function!(); @@ -356,7 +369,13 @@ impl ReceiveSetBroadcaster { } }); - inner.history.push(msg); + let msg_is_data = matches!(data, LogMsg::ArrowMsg(_, _)); + if msg_is_data { + inner.history.push(msg); + } else { + // Keep non-data commands around for clients late to the party. + inner.history.push_static(msg); + } } re_smart_channel::SmartMessagePayload::Flush { on_flush_done } => { @@ -386,6 +405,13 @@ impl ReceiveSetBroadcaster { // Meaning that if a new one connects, we stall the old connections until we have sent all messages to this one. let mut inner = self.inner.lock(); + for msg in &inner.history.messages_static { + if let Err(err) = client.send(tungstenite::Message::Binary(msg.clone())) { + re_log::warn!("Error sending static message to web socket client: {err}"); + return; + } + } + for msg in &inner.history.messages { if let Err(err) = client.send(tungstenite::Message::Binary(msg.clone())) { re_log::warn!("Error sending message to web socket client: {err}");