From c99240722c7b177ea98ea33b1056604159c512d7 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 12 Apr 2023 19:50:53 +0200 Subject: [PATCH 1/4] Always flush when we remove a sink --- rerun_py/src/python_bridge.rs | 40 ++++++++++++++++++++-------------- rerun_py/src/python_session.rs | 21 +++++++++++++----- 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 5ad62a9652b9..3664253b165a 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -132,14 +132,15 @@ fn rerun_bindings(py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(get_recording_id, m)?)?; m.add_function(wrap_pyfunction!(set_recording_id, m)?)?; - m.add_function(wrap_pyfunction!(init, m)?)?; m.add_function(wrap_pyfunction!(connect, m)?)?; - m.add_function(wrap_pyfunction!(serve, m)?)?; - m.add_function(wrap_pyfunction!(shutdown, m)?)?; - m.add_function(wrap_pyfunction!(is_enabled, m)?)?; - m.add_function(wrap_pyfunction!(set_enabled, m)?)?; m.add_function(wrap_pyfunction!(disconnect, m)?)?; + m.add_function(wrap_pyfunction!(flush, m)?)?; + m.add_function(wrap_pyfunction!(init, m)?)?; + m.add_function(wrap_pyfunction!(is_enabled, m)?)?; m.add_function(wrap_pyfunction!(save, m)?)?; + m.add_function(wrap_pyfunction!(serve, m)?)?; + m.add_function(wrap_pyfunction!(set_enabled, m)?)?; + m.add_function(wrap_pyfunction!(shutdown, m)?)?; m.add_function(wrap_pyfunction!(set_time_sequence, m)?)?; m.add_function(wrap_pyfunction!(set_time_seconds, m)?)?; @@ -329,15 +330,8 @@ fn serve(open_browser: bool) -> PyResult<()> { #[pyfunction] fn shutdown(py: Python<'_>) { - // Release the GIL in case any flushing behavior needs to - // cleanup a python object. - py.allow_threads(|| { - re_log::debug!("Shutting down the Rerun SDK"); - let mut session = python_session(); - session.drop_msgs_if_disconnected(); - session.flush(); - session.disconnect(); - }); + re_log::info!("Shutting down the Rerun SDK"); + disconnect(py); } /// Is logging enabled in the global session? @@ -355,13 +349,27 @@ fn set_enabled(enabled: bool) { python_session().set_enabled(enabled); } +/// Block until outstanding data has been flushed to the sink +#[pyfunction] +fn flush(py: Python<'_>) { + // Release the GIL in case any flushing behavior needs to + // cleanup a python object. + py.allow_threads(|| { + python_session().flush(); + }); +} + /// Disconnect from remote server (if any). /// /// Subsequent log messages will be buffered and either sent on the next call to `connect`, /// or shown with `show`. #[pyfunction] -fn disconnect() { - python_session().disconnect(); +fn disconnect(py: Python<'_>) { + // Release the GIL in case any flushing behavior needs to + // cleanup a python object. + py.allow_threads(|| { + python_session().disconnect(); + }); } #[pyfunction] diff --git a/rerun_py/src/python_session.rs b/rerun_py/src/python_session.rs index 532495975fbf..1bd7a055b527 100644 --- a/rerun_py/src/python_session.rs +++ b/rerun_py/src/python_session.rs @@ -144,9 +144,23 @@ impl PythonSession { /// If the previous sink is [`rerun::sink::BufferedSink`] (the default), /// it will be drained and sent to the new sink. pub fn set_sink(&mut self, sink: Box) { + // Capture the backlog (should only applies if this was a `BufferedSink`) let backlog = self.sink.drain_backlog(); + + // Before changing the sink, we set drop_if_disconnected and + // flush. This ensures that any messages that are currently + // buffered will be sent. + self.sink.drop_msgs_if_disconnected(); + self.sink.flush(); self.sink = sink; - self.sink.send_all(backlog); + + if backlog.is_empty() { + // If we had no backlog, we need to send the `BeginRecording` message to the new sink. + self.has_sent_begin_recording_msg = false; + } else { + // Otherwise the backlog should have had the `BeginRecording` message in it already. + self.sink.send_all(backlog); + } } /// Send log data to a remote viewer/server. @@ -193,11 +207,6 @@ impl PythonSession { self.sink.flush(); } - /// If the tcp session is disconnected, allow it to quit early and drop unsent messages - pub fn drop_msgs_if_disconnected(&mut self) { - self.sink.drop_msgs_if_disconnected(); - } - /// Send a single [`DataRow`]. pub fn send_row(&mut self, row: DataRow) -> PyResult<()> { let msg = row From 4f426789a41262721701bd4a84252f77c05155f3 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 12 Apr 2023 19:59:13 +0200 Subject: [PATCH 2/4] comment --- rerun_py/src/python_bridge.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 3664253b165a..2f22f1e28605 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -331,6 +331,8 @@ fn serve(open_browser: bool) -> PyResult<()> { #[pyfunction] fn shutdown(py: Python<'_>) { re_log::info!("Shutting down the Rerun SDK"); + // Disconnect the current sink which ensures that + // it flushes and cleans up. disconnect(py); } From cc97d7f13977b7bdb1033eee547d4d6c5614a579 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 12 Apr 2023 19:59:55 +0200 Subject: [PATCH 3/4] keep debug as debug --- rerun_py/src/python_bridge.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 2f22f1e28605..2d9c72677619 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -330,7 +330,7 @@ fn serve(open_browser: bool) -> PyResult<()> { #[pyfunction] fn shutdown(py: Python<'_>) { - re_log::info!("Shutting down the Rerun SDK"); + re_log::debug!("Shutting down the Rerun SDK"); // Disconnect the current sink which ensures that // it flushes and cleans up. disconnect(py); From 33cc46da6c9979f9389904d35d2c371b36f71a4b Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Thu, 13 Apr 2023 18:01:47 +0200 Subject: [PATCH 4/4] I swear I speak english --- rerun_py/src/python_session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rerun_py/src/python_session.rs b/rerun_py/src/python_session.rs index 1bd7a055b527..e69fb9a5ed65 100644 --- a/rerun_py/src/python_session.rs +++ b/rerun_py/src/python_session.rs @@ -144,7 +144,7 @@ impl PythonSession { /// If the previous sink is [`rerun::sink::BufferedSink`] (the default), /// it will be drained and sent to the new sink. pub fn set_sink(&mut self, sink: Box) { - // Capture the backlog (should only applies if this was a `BufferedSink`) + // Capture the backlog (should only apply if this was a `BufferedSink`) let backlog = self.sink.drain_backlog(); // Before changing the sink, we set drop_if_disconnected and