diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 5ad62a9652b9..2d9c72677619 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -132,14 +132,15 @@ fn rerun_bindings(py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(get_recording_id, m)?)?; m.add_function(wrap_pyfunction!(set_recording_id, m)?)?; - m.add_function(wrap_pyfunction!(init, m)?)?; m.add_function(wrap_pyfunction!(connect, m)?)?; - m.add_function(wrap_pyfunction!(serve, m)?)?; - m.add_function(wrap_pyfunction!(shutdown, m)?)?; - m.add_function(wrap_pyfunction!(is_enabled, m)?)?; - m.add_function(wrap_pyfunction!(set_enabled, m)?)?; m.add_function(wrap_pyfunction!(disconnect, m)?)?; + m.add_function(wrap_pyfunction!(flush, m)?)?; + m.add_function(wrap_pyfunction!(init, m)?)?; + m.add_function(wrap_pyfunction!(is_enabled, m)?)?; m.add_function(wrap_pyfunction!(save, m)?)?; + m.add_function(wrap_pyfunction!(serve, m)?)?; + m.add_function(wrap_pyfunction!(set_enabled, m)?)?; + m.add_function(wrap_pyfunction!(shutdown, m)?)?; m.add_function(wrap_pyfunction!(set_time_sequence, m)?)?; m.add_function(wrap_pyfunction!(set_time_seconds, m)?)?; @@ -329,15 +330,10 @@ fn serve(open_browser: bool) -> PyResult<()> { #[pyfunction] fn shutdown(py: Python<'_>) { - // Release the GIL in case any flushing behavior needs to - // cleanup a python object. - py.allow_threads(|| { - re_log::debug!("Shutting down the Rerun SDK"); - let mut session = python_session(); - session.drop_msgs_if_disconnected(); - session.flush(); - session.disconnect(); - }); + re_log::debug!("Shutting down the Rerun SDK"); + // Disconnect the current sink which ensures that + // it flushes and cleans up. + disconnect(py); } /// Is logging enabled in the global session? @@ -355,13 +351,27 @@ fn set_enabled(enabled: bool) { python_session().set_enabled(enabled); } +/// Block until outstanding data has been flushed to the sink +#[pyfunction] +fn flush(py: Python<'_>) { + // Release the GIL in case any flushing behavior needs to + // cleanup a python object. + py.allow_threads(|| { + python_session().flush(); + }); +} + /// Disconnect from remote server (if any). /// /// Subsequent log messages will be buffered and either sent on the next call to `connect`, /// or shown with `show`. #[pyfunction] -fn disconnect() { - python_session().disconnect(); +fn disconnect(py: Python<'_>) { + // Release the GIL in case any flushing behavior needs to + // cleanup a python object. + py.allow_threads(|| { + python_session().disconnect(); + }); } #[pyfunction] diff --git a/rerun_py/src/python_session.rs b/rerun_py/src/python_session.rs index 532495975fbf..e69fb9a5ed65 100644 --- a/rerun_py/src/python_session.rs +++ b/rerun_py/src/python_session.rs @@ -144,9 +144,23 @@ impl PythonSession { /// If the previous sink is [`rerun::sink::BufferedSink`] (the default), /// it will be drained and sent to the new sink. pub fn set_sink(&mut self, sink: Box) { + // Capture the backlog (should only apply if this was a `BufferedSink`) let backlog = self.sink.drain_backlog(); + + // Before changing the sink, we set drop_if_disconnected and + // flush. This ensures that any messages that are currently + // buffered will be sent. + self.sink.drop_msgs_if_disconnected(); + self.sink.flush(); self.sink = sink; - self.sink.send_all(backlog); + + if backlog.is_empty() { + // If we had no backlog, we need to send the `BeginRecording` message to the new sink. + self.has_sent_begin_recording_msg = false; + } else { + // Otherwise the backlog should have had the `BeginRecording` message in it already. + self.sink.send_all(backlog); + } } /// Send log data to a remote viewer/server. @@ -193,11 +207,6 @@ impl PythonSession { self.sink.flush(); } - /// If the tcp session is disconnected, allow it to quit early and drop unsent messages - pub fn drop_msgs_if_disconnected(&mut self) { - self.sink.drop_msgs_if_disconnected(); - } - /// Send a single [`DataRow`]. pub fn send_row(&mut self, row: DataRow) -> PyResult<()> { let msg = row