Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always send recording_id as part of LogMsg #1778

Merged
merged 9 commits into from
Apr 6, 2023
Merged
13 changes: 10 additions & 3 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn live_bytes() -> usize {

// ----------------------------------------------------------------------------

use re_log_types::{entity_path, DataRow, MsgId};
use re_log_types::{entity_path, DataRow, MsgId, RecordingId};

fn main() {
log_messages();
Expand Down Expand Up @@ -91,6 +91,7 @@ fn log_messages() {

const NUM_POINTS: usize = 1_000;

let recording_id = RecordingId::random();
let timeline = Timeline::new_sequence("frame_nr");
let mut time_point = TimePoint::default();
time_point.insert(timeline, TimeInt::from(0));
Expand All @@ -116,7 +117,10 @@ fn log_messages() {
.into_table(),
);
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg = Box::new(LogMsg::ArrowMsg(
recording_id,
ArrowMsg::try_from(&*table).unwrap(),
));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
Expand All @@ -139,7 +143,10 @@ fn log_messages() {
.into_table(),
);
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg = Box::new(LogMsg::ArrowMsg(
recording_id,
ArrowMsg::try_from(&*table).unwrap(),
));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,15 @@ impl LogDb {

match &msg {
LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg),
LogMsg::EntityPathOpMsg(msg) => {
LogMsg::EntityPathOpMsg(_, msg) => {
let EntityPathOpMsg {
msg_id,
time_point,
path_op,
} = msg;
self.entity_db.add_path_op(*msg_id, time_point, path_op);
}
LogMsg::ArrowMsg(inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::ArrowMsg(_, inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::Goodbye(_) => {}
}

Expand Down
29 changes: 16 additions & 13 deletions crates/re_log_types/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use re_log_types::{
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, RecordingId,
};

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -42,18 +42,18 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
messages
}

fn generate_messages(tables: &[DataTable]) -> Vec<LogMsg> {
fn generate_messages(recording_id: RecordingId, tables: &[DataTable]) -> Vec<LogMsg> {
tables
.iter()
.map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table).unwrap()))
.map(|table| LogMsg::ArrowMsg(recording_id, ArrowMsg::try_from(table).unwrap()))
.collect()
}

fn decode_tables(messages: &[LogMsg]) -> Vec<DataTable> {
messages
.iter()
.map(|log_msg| {
if let LogMsg::ArrowMsg(arrow_msg) = log_msg {
if let LogMsg::ArrowMsg(_, arrow_msg) = log_msg {
DataTable::try_from(arrow_msg).unwrap()
} else {
unreachable!()
Expand Down Expand Up @@ -81,21 +81,22 @@ fn mono_points_arrow(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let mut group = c.benchmark_group("mono_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_tables);
});
let tables = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&tables));
b.iter(|| generate_messages(recording_id, &tables));
});
let messages = generate_messages(&tables);
let messages = generate_messages(recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down Expand Up @@ -136,21 +137,22 @@ fn mono_points_arrow_batched(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let mut group = c.benchmark_group("mono_points_arrow_batched");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_table);
});
let tables = [generate_table()];
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&tables));
b.iter(|| generate_messages(recording_id, &tables));
});
let messages = generate_messages(&tables);
let messages = generate_messages(recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&[generate_table()])));
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &[generate_table()])));
});

let encoded = encode_log_msgs(&messages);
Expand Down Expand Up @@ -192,21 +194,22 @@ fn batch_points_arrow(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let mut group = c.benchmark_group("batch_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_tables);
});
let tables = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&tables));
b.iter(|| generate_messages(recording_id, &tables));
});
let messages = generate_messages(&tables);
let messages = generate_messages(recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down
20 changes: 14 additions & 6 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ pub enum LogMsg {
BeginRecordingMsg(BeginRecordingMsg),

/// Server-backed operation on an [`EntityPath`].
EntityPathOpMsg(EntityPathOpMsg),
EntityPathOpMsg(RecordingId, EntityPathOpMsg),

/// Log an entity using an [`ArrowMsg`].
ArrowMsg(ArrowMsg),
ArrowMsg(RecordingId, ArrowMsg),

/// Sent when the client shuts down the connection.
Goodbye(MsgId),
Expand All @@ -186,19 +186,27 @@ impl LogMsg {
pub fn id(&self) -> MsgId {
match self {
Self::BeginRecordingMsg(msg) => msg.msg_id,
Self::EntityPathOpMsg(msg) => msg.msg_id,
Self::EntityPathOpMsg(_, msg) => msg.msg_id,
Self::Goodbye(msg_id) => *msg_id,
// TODO(#1619): the following only makes sense because, while we support sending and
// receiving batches, we don't actually do so yet.
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
Self::ArrowMsg(msg) => msg.table_id,
Self::ArrowMsg(_, msg) => msg.table_id,
}
}

pub fn recording_id(&self) -> Option<&RecordingId> {
match self {
Self::BeginRecordingMsg(msg) => Some(&msg.info.recording_id),
Self::EntityPathOpMsg(recording_id, _) | Self::ArrowMsg(recording_id, _) => {
Some(recording_id)
}
Self::Goodbye(_) => None,
}
}
}

impl_into_enum!(BeginRecordingMsg, LogMsg, BeginRecordingMsg);
impl_into_enum!(EntityPathOpMsg, LogMsg, EntityPathOpMsg);
impl_into_enum!(ArrowMsg, LogMsg, ArrowMsg);

// ----------------------------------------------------------------------------

Expand Down
31 changes: 23 additions & 8 deletions crates/re_sdk/src/msg_sender.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use re_log_types::{component_types::InstanceKey, DataRow, DataTableError};
use std::borrow::Borrow;

use re_log_types::{component_types::InstanceKey, DataRow, DataTableError, RecordingId};

use crate::{
components::Transform,
log::{DataCell, LogMsg, MsgId},
sink::LogSink,
time::{Time, TimeInt, TimePoint, Timeline},
Component, EntityPath, SerializableComponent,
Component, EntityPath, SerializableComponent, Session,
};

// TODO(#1619): Rust SDK batching
Expand Down Expand Up @@ -229,29 +231,42 @@ impl MsgSender {

/// Consumes, packs, sanity checks and finally sends the message to the currently configured
/// target of the SDK.
pub fn send(self, sink: &impl std::borrow::Borrow<dyn LogSink>) -> Result<(), DataTableError> {
self.send_to_sink(sink.borrow())
pub fn send(self, session: &Session) -> Result<(), DataTableError> {
self.send_to_sink(session.recording_id(), session.borrow())
}

/// Consumes, packs, sanity checks and finally sends the message to the currently configured
/// target of the SDK.
fn send_to_sink(self, sink: &dyn LogSink) -> Result<(), DataTableError> {
fn send_to_sink(
self,
recording_id: RecordingId,
sink: &dyn LogSink,
) -> Result<(), DataTableError> {
if !sink.is_enabled() {
return Ok(()); // silently drop the message
}

let [row_standard, row_transforms, row_splats] = self.into_rows();

if let Some(row_transforms) = row_transforms {
sink.send(LogMsg::ArrowMsg((&row_transforms.into_table()).try_into()?));
sink.send(LogMsg::ArrowMsg(
recording_id,
(&row_transforms.into_table()).try_into()?,
));
}
if let Some(row_splats) = row_splats {
sink.send(LogMsg::ArrowMsg((&row_splats.into_table()).try_into()?));
sink.send(LogMsg::ArrowMsg(
recording_id,
(&row_splats.into_table()).try_into()?,
));
}
// Always the primary component last so range-based queries will include the other data.
// Since the primary component can't be splatted it must be in msg_standard, see(#1215).
if let Some(row_standard) = row_standard {
sink.send(LogMsg::ArrowMsg((&row_standard.into_table()).try_into()?));
sink.send(LogMsg::ArrowMsg(
recording_id,
(&row_standard.into_table()).try_into()?,
));
}

Ok(())
Expand Down
36 changes: 29 additions & 7 deletions crates/re_sdk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ impl SessionBuilder {
#[must_use]
#[derive(Clone)]
pub struct Session {
recording_info: RecordingInfo,
sink: Arc<dyn LogSink>,
// TODO(emilk): add convenience `TimePoint` here so that users can
// do things like `session.set_time_sequence("frame", frame_idx);`
Expand Down Expand Up @@ -222,20 +223,33 @@ impl Session {
sink.send(
re_log_types::BeginRecordingMsg {
msg_id: re_log_types::MsgId::random(),
info: recording_info,
info: recording_info.clone(),
}
.into(),
);
}

Self { sink: sink.into() }
Self {
recording_info,
sink: sink.into(),
}
}

/// Construct a new session with a disabled "dummy" sink that drops all logging messages.
///
/// [`Self::is_enabled`] will return `false`.
pub fn disabled() -> Self {
Self {
recording_info: RecordingInfo {
application_id: ApplicationId::unknown(),
recording_id: Default::default(),
is_official_example: crate::called_from_official_rust_example(),
started: Time::now(),
recording_source: RecordingSource::RustSdk {
rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(),
llvm_version: env!("RE_BUILD_LLVM_VERSION").into(),
},
},
sink: crate::sink::disabled().into(),
}
}
Expand Down Expand Up @@ -272,17 +286,25 @@ impl Session {
time_point: &re_log_types::TimePoint,
path_op: re_log_types::PathOp,
) {
self.send(LogMsg::EntityPathOpMsg(re_log_types::EntityPathOpMsg {
msg_id: re_log_types::MsgId::random(),
time_point: time_point.clone(),
path_op,
}));
self.send(LogMsg::EntityPathOpMsg(
self.recording_id(),
re_log_types::EntityPathOpMsg {
msg_id: re_log_types::MsgId::random(),
time_point: time_point.clone(),
path_op,
},
));
}

/// Drain all buffered [`LogMsg`]es and return them.
pub fn drain_backlog(&self) -> Vec<LogMsg> {
self.sink.drain_backlog()
}

/// The current [`RecordingId`].
pub fn recording_id(&self) -> RecordingId {
self.recording_info.recording_id
}
}

impl AsRef<dyn LogSink> for Session {
Expand Down
6 changes: 4 additions & 2 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ impl CongestionManager {
#[allow(clippy::match_same_arms)]
match msg {
// we don't want to drop any of these
LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_) | LogMsg::Goodbye(_) => true,
LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_, _) | LogMsg::Goodbye(_) => {
true
}

LogMsg::ArrowMsg(arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max),
LogMsg::ArrowMsg(_, arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max),
}
}

Expand Down
Loading