Skip to content

Commit

Permalink
Always flush when we remove a sink (#1830)
Browse files Browse the repository at this point in the history
Whenever we disconnect (or implicitly disconnect by swapping a sink) we should flush the pending messages. Additionally disconnect and flush calls both require releasing the GIL (for the same reason as shutdown previously).
  • Loading branch information
jleibs authored Apr 13, 2023
1 parent 24f22dc commit 48d4f28
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
42 changes: 26 additions & 16 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,15 @@ fn rerun_bindings(py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(get_recording_id, m)?)?;
m.add_function(wrap_pyfunction!(set_recording_id, m)?)?;

m.add_function(wrap_pyfunction!(init, m)?)?;
m.add_function(wrap_pyfunction!(connect, m)?)?;
m.add_function(wrap_pyfunction!(serve, m)?)?;
m.add_function(wrap_pyfunction!(shutdown, m)?)?;
m.add_function(wrap_pyfunction!(is_enabled, m)?)?;
m.add_function(wrap_pyfunction!(set_enabled, m)?)?;
m.add_function(wrap_pyfunction!(disconnect, m)?)?;
m.add_function(wrap_pyfunction!(flush, m)?)?;
m.add_function(wrap_pyfunction!(init, m)?)?;
m.add_function(wrap_pyfunction!(is_enabled, m)?)?;
m.add_function(wrap_pyfunction!(save, m)?)?;
m.add_function(wrap_pyfunction!(serve, m)?)?;
m.add_function(wrap_pyfunction!(set_enabled, m)?)?;
m.add_function(wrap_pyfunction!(shutdown, m)?)?;

m.add_function(wrap_pyfunction!(set_time_sequence, m)?)?;
m.add_function(wrap_pyfunction!(set_time_seconds, m)?)?;
Expand Down Expand Up @@ -329,15 +330,10 @@ fn serve(open_browser: bool) -> PyResult<()> {

#[pyfunction]
fn shutdown(py: Python<'_>) {
// Release the GIL in case any flushing behavior needs to
// cleanup a python object.
py.allow_threads(|| {
re_log::debug!("Shutting down the Rerun SDK");
let mut session = python_session();
session.drop_msgs_if_disconnected();
session.flush();
session.disconnect();
});
re_log::debug!("Shutting down the Rerun SDK");
// Disconnect the current sink which ensures that
// it flushes and cleans up.
disconnect(py);
}

/// Is logging enabled in the global session?
Expand All @@ -355,13 +351,27 @@ fn set_enabled(enabled: bool) {
python_session().set_enabled(enabled);
}

/// Block until outstanding data has been flushed to the sink
#[pyfunction]
fn flush(py: Python<'_>) {
// Release the GIL in case any flushing behavior needs to
// cleanup a python object.
py.allow_threads(|| {
python_session().flush();
});
}

/// Disconnect from remote server (if any).
///
/// Subsequent log messages will be buffered and either sent on the next call to `connect`,
/// or shown with `show`.
#[pyfunction]
fn disconnect() {
python_session().disconnect();
fn disconnect(py: Python<'_>) {
// Release the GIL in case any flushing behavior needs to
// cleanup a python object.
py.allow_threads(|| {
python_session().disconnect();
});
}

#[pyfunction]
Expand Down
21 changes: 15 additions & 6 deletions rerun_py/src/python_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,23 @@ impl PythonSession {
/// If the previous sink is [`rerun::sink::BufferedSink`] (the default),
/// it will be drained and sent to the new sink.
pub fn set_sink(&mut self, sink: Box<dyn LogSink>) {
// Capture the backlog (should only apply if this was a `BufferedSink`)
let backlog = self.sink.drain_backlog();

// Before changing the sink, we set drop_if_disconnected and
// flush. This ensures that any messages that are currently
// buffered will be sent.
self.sink.drop_msgs_if_disconnected();
self.sink.flush();
self.sink = sink;
self.sink.send_all(backlog);

if backlog.is_empty() {
// If we had no backlog, we need to send the `BeginRecording` message to the new sink.
self.has_sent_begin_recording_msg = false;
} else {
// Otherwise the backlog should have had the `BeginRecording` message in it already.
self.sink.send_all(backlog);
}
}

/// Send log data to a remote viewer/server.
Expand Down Expand Up @@ -193,11 +207,6 @@ impl PythonSession {
self.sink.flush();
}

/// If the tcp session is disconnected, allow it to quit early and drop unsent messages
pub fn drop_msgs_if_disconnected(&mut self) {
self.sink.drop_msgs_if_disconnected();
}

/// Send a single [`DataRow`].
pub fn send_row(&mut self, row: DataRow) -> PyResult<()> {
let msg = row
Expand Down

1 comment on commit 48d4f28

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 48d4f28 Previous: 24f22dc Ratio
datastore/num_rows=1000/num_instances=1000/packed=false/insert/default 2817680 ns/iter (± 19745) 2898207 ns/iter (± 121294) 0.97
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at/default 369 ns/iter (± 2) 370 ns/iter (± 17) 1.00
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at_missing/primary/default 261 ns/iter (± 1) 267 ns/iter (± 1) 0.98
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at_missing/secondaries/default 418 ns/iter (± 7) 421 ns/iter (± 1) 0.99
datastore/num_rows=1000/num_instances=1000/packed=false/range/default 2942281 ns/iter (± 26155) 3014521 ns/iter (± 108182) 0.98
datastore/num_rows=1000/num_instances=1000/gc/default 2356007 ns/iter (± 16283) 2369779 ns/iter (± 5520) 0.99
mono_points_arrow/generate_message_bundles 25527041 ns/iter (± 1085534) 28442143 ns/iter (± 1609753) 0.90
mono_points_arrow/generate_messages 113407675 ns/iter (± 980132) 126589479 ns/iter (± 1103601) 0.90
mono_points_arrow/encode_log_msg 141226930 ns/iter (± 991972) 155149866 ns/iter (± 1709961) 0.91
mono_points_arrow/encode_total 281888721 ns/iter (± 1637292) 309647636 ns/iter (± 3228362) 0.91
mono_points_arrow/decode_log_msg 176627784 ns/iter (± 5867103) 190813640 ns/iter (± 2207729) 0.93
mono_points_arrow/decode_message_bundles 56767287 ns/iter (± 1365075) 67428539 ns/iter (± 1560376) 0.84
mono_points_arrow/decode_total 237055463 ns/iter (± 1991373) 258683296 ns/iter (± 2835213) 0.92
mono_points_arrow_batched/generate_message_bundles 20201571 ns/iter (± 1059769) 22014530 ns/iter (± 2079295) 0.92
mono_points_arrow_batched/generate_messages 4010825 ns/iter (± 113697) 4355767 ns/iter (± 157573) 0.92
mono_points_arrow_batched/encode_log_msg 1343093 ns/iter (± 8702) 1377381 ns/iter (± 7716) 0.98
mono_points_arrow_batched/encode_total 26198264 ns/iter (± 983941) 28213447 ns/iter (± 3174551) 0.93
mono_points_arrow_batched/decode_log_msg 781933 ns/iter (± 3403) 780993 ns/iter (± 2935) 1.00
mono_points_arrow_batched/decode_message_bundles 7607328 ns/iter (± 187172) 7585376 ns/iter (± 98733) 1.00
mono_points_arrow_batched/decode_total 8725722 ns/iter (± 293203) 8567068 ns/iter (± 309478) 1.02
batch_points_arrow/generate_message_bundles 239420 ns/iter (± 2351) 239158 ns/iter (± 546) 1.00
batch_points_arrow/generate_messages 5027 ns/iter (± 22) 5037 ns/iter (± 22) 1.00
batch_points_arrow/encode_log_msg 259092 ns/iter (± 1653) 257828 ns/iter (± 1637) 1.00
batch_points_arrow/encode_total 534411 ns/iter (± 2576) 529030 ns/iter (± 4465) 1.01
batch_points_arrow/decode_log_msg 208754 ns/iter (± 913) 209005 ns/iter (± 902) 1.00
batch_points_arrow/decode_message_bundles 1855 ns/iter (± 13) 1853 ns/iter (± 14) 1.00
batch_points_arrow/decode_total 217420 ns/iter (± 1159) 214591 ns/iter (± 1089) 1.01
arrow_mono_points/insert 2341117151 ns/iter (± 3876987) 2545905758 ns/iter (± 9660609) 0.92
arrow_mono_points/query 1201265 ns/iter (± 13180) 1242956 ns/iter (± 25666) 0.97
arrow_batch_points/insert 1153355 ns/iter (± 7372) 1159840 ns/iter (± 5425) 0.99
arrow_batch_points/query 14738 ns/iter (± 150) 14582 ns/iter (± 119) 1.01
arrow_batch_vecs/insert 26421 ns/iter (± 116) 26301 ns/iter (± 156) 1.00
arrow_batch_vecs/query 325080 ns/iter (± 1500) 325034 ns/iter (± 847) 1.00
tuid/Tuid::random 34 ns/iter (± 0) 34 ns/iter (± 0) 1

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.