From e647f07fd5f79cbc25e210983d87dfa514e0ad3d Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 24 Mar 2023 11:17:17 +0100 Subject: [PATCH] introduce DataRow and DataTable --- crates/re_arrow_store/benches/data_store.rs | 51 +- .../re_arrow_store/examples/dump_dataframe.rs | 67 ++- .../examples/latest_component.rs | 10 +- .../examples/latest_components.rs | 10 +- .../examples/range_components.rs | 30 +- crates/re_arrow_store/src/store_polars.rs | 2 + crates/re_arrow_store/src/store_write.rs | 133 ++--- crates/re_arrow_store/src/test_util.rs | 14 +- crates/re_arrow_store/tests/correctness.rs | 54 +- crates/re_arrow_store/tests/data_store.rs | 388 ++++++-------- crates/re_arrow_store/tests/internals.rs | 43 +- crates/re_data_store/examples/memory_usage.rs | 35 +- crates/re_data_store/src/log_db.rs | 50 +- .../benches/msg_encode_benchmark.rs | 44 +- crates/re_log_types/src/arrow_msg.rs | 14 +- .../src/component_types/tensor.rs | 31 +- crates/re_log_types/src/data_cell.rs | 12 +- crates/re_log_types/src/data_row.rs | 507 ++++++++++++++++++ crates/re_log_types/src/data_table.rs | 275 ++++++++++ crates/re_log_types/src/lib.rs | 4 + crates/re_log_types/src/msg_bundle.rs | 108 +--- crates/re_query/benches/query_benchmark.rs | 38 +- crates/re_query/examples/range.rs | 27 +- crates/re_query/src/query.rs | 26 +- crates/re_query/tests/query_tests.rs | 49 +- crates/re_query/tests/range_tests.rs | 106 ++-- crates/re_sdk/src/msg_sender.rs | 49 +- rerun_py/src/arrow.rs | 18 +- rerun_py/src/python_bridge.rs | 87 +-- 29 files changed, 1482 insertions(+), 800 deletions(-) create mode 100644 crates/re_log_types/src/data_row.rs create mode 100644 crates/re_log_types/src/data_table.rs diff --git a/crates/re_arrow_store/benches/data_store.rs b/crates/re_arrow_store/benches/data_store.rs index 8d7367d968bef..cd055537b0a91 100644 --- a/crates/re_arrow_store/benches/data_store.rs +++ b/crates/re_arrow_store/benches/data_store.rs @@ -8,8 +8,7 @@ use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, Time use re_log_types::{ component_types::{InstanceKey, Rect2D}, datagen::{build_frame_nr, build_some_instances, build_some_rects}, - msg_bundle::{try_build_msg_bundle2, MsgBundle}, - Component as _, ComponentName, EntityPath, MsgId, TimeType, Timeline, + Component as _, ComponentName, DataRow, EntityPath, MsgId, TimeType, Timeline, }; // --- @@ -27,28 +26,30 @@ const NUM_RECTS: i64 = 1; // --- Benchmarks --- +// TODO(cmc): need additional benches for full tables + fn insert(c: &mut Criterion) { { - let msgs = build_messages(NUM_RECTS as usize); + let rows = build_rows(NUM_RECTS as usize); let mut group = c.benchmark_group("datastore/insert/batch/rects"); group.throughput(criterion::Throughput::Elements( (NUM_RECTS * NUM_FRAMES) as _, )); group.bench_function("insert", |b| { - b.iter(|| insert_messages(Default::default(), InstanceKey::name(), msgs.iter())); + b.iter(|| insert_rows(Default::default(), InstanceKey::name(), rows.iter())); }); } } fn latest_at_batch(c: &mut Criterion) { { - let msgs = build_messages(NUM_RECTS as usize); - let store = insert_messages(Default::default(), InstanceKey::name(), msgs.iter()); + let rows = build_rows(NUM_RECTS as usize); + let store = insert_rows(Default::default(), InstanceKey::name(), rows.iter()); let mut group = c.benchmark_group("datastore/latest_at/batch/rects"); group.throughput(criterion::Throughput::Elements(NUM_RECTS as _)); group.bench_function("query", |b| { b.iter(|| { - let results = latest_messages_at(&store, Rect2D::name(), &[Rect2D::name()]); + let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]); let rects = results[0] .as_ref() .unwrap() @@ -70,27 +71,27 @@ fn latest_at_missing_components(c: &mut Criterion) { }; { - let msgs = build_messages(NUM_RECTS as usize); - let store = insert_messages(config.clone(), InstanceKey::name(), msgs.iter()); + let msgs = build_rows(NUM_RECTS as usize); + let store = insert_rows(config.clone(), InstanceKey::name(), msgs.iter()); let mut group = c.benchmark_group("datastore/latest_at/missing_components"); group.throughput(criterion::Throughput::Elements(NUM_RECTS as _)); group.bench_function("primary", |b| { b.iter(|| { let results = - latest_messages_at(&store, "non_existing_component".into(), &[Rect2D::name()]); + latest_data_at(&store, "non_existing_component".into(), &[Rect2D::name()]); assert!(results[0].is_none()); }); }); } { - let msgs = build_messages(NUM_RECTS as usize); - let store = insert_messages(config, InstanceKey::name(), msgs.iter()); + let msgs = build_rows(NUM_RECTS as usize); + let store = insert_rows(config, InstanceKey::name(), msgs.iter()); let mut group = c.benchmark_group("datastore/latest_at/missing_components"); group.throughput(criterion::Throughput::Elements(NUM_RECTS as _)); group.bench_function("secondaries", |b| { b.iter(|| { - let results = latest_messages_at( + let results = latest_data_at( &store, Rect2D::name(), &[ @@ -109,15 +110,15 @@ fn latest_at_missing_components(c: &mut Criterion) { fn range_batch(c: &mut Criterion) { { - let msgs = build_messages(NUM_RECTS as usize); - let store = insert_messages(Default::default(), InstanceKey::name(), msgs.iter()); + let msgs = build_rows(NUM_RECTS as usize); + let store = insert_rows(Default::default(), InstanceKey::name(), msgs.iter()); let mut group = c.benchmark_group("datastore/range/batch/rects"); group.throughput(criterion::Throughput::Elements( (NUM_RECTS * NUM_FRAMES) as _, )); group.bench_function("query", |b| { b.iter(|| { - let msgs = range_messages(&store, [Rect2D::name()]); + let msgs = range_data(&store, [Rect2D::name()]); for (cur_time, (time, results)) in msgs.enumerate() { let time = time.unwrap(); assert_eq!(cur_time as i64, time.as_i64()); @@ -146,31 +147,31 @@ criterion_main!(benches); // --- Helpers --- -fn build_messages(n: usize) -> Vec { +fn build_rows(n: usize) -> Vec { (0..NUM_FRAMES) .map(move |frame_idx| { - try_build_msg_bundle2( - MsgId::ZERO, + DataRow::from_cells2( + MsgId::random(), "rects", [build_frame_nr(frame_idx.into())], + n as _, (build_some_instances(n), build_some_rects(n)), ) - .unwrap() }) .collect() } -fn insert_messages<'a>( +fn insert_rows<'a>( config: DataStoreConfig, cluster_key: ComponentName, - msgs: impl Iterator, + rows: impl Iterator, ) -> DataStore { let mut store = DataStore::new(cluster_key, config); - msgs.for_each(|msg_bundle| store.insert_row(msg_bundle).unwrap()); + rows.for_each(|row| store.insert_row(row).unwrap()); store } -fn latest_messages_at( +fn latest_data_at( store: &DataStore, primary: ComponentName, secondaries: &[ComponentName; N], @@ -185,7 +186,7 @@ fn latest_messages_at( store.get(secondaries, &row_indices) } -fn range_messages( +fn range_data( store: &DataStore, components: [ComponentName; N], ) -> impl Iterator, [Option>; N])> + '_ { diff --git a/crates/re_arrow_store/examples/dump_dataframe.rs b/crates/re_arrow_store/examples/dump_dataframe.rs index 7282ef4e8b47a..74b5e111ac693 100644 --- a/crates/re_arrow_store/examples/dump_dataframe.rs +++ b/crates/re_arrow_store/examples/dump_dataframe.rs @@ -4,7 +4,7 @@ //! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example dump_dataframe //! ``` -use re_arrow_store::{test_bundle, DataStore}; +use re_arrow_store::{test_row, DataStore}; use re_log_types::{ component_types::InstanceKey, datagen::{ @@ -25,51 +25,50 @@ fn main() { ]; for ent_path in &ent_paths { - let bundle1 = test_bundle!(ent_path @ [ - build_frame_nr(1.into()), build_log_time(Time::now()), - ] => [build_some_instances(2), build_some_rects(2)]); - store.insert_row(&bundle1).unwrap(); + let row1 = test_row!(ent_path @ [ + build_frame_nr(1.into()), build_log_time(Time::now()), + ] => 2; [build_some_instances(2), build_some_rects(2)]); + store.insert_row(&row1).unwrap(); } for ent_path in &ent_paths { - let bundle2 = test_bundle!(ent_path @ [ - build_frame_nr(2.into()) - ] => [build_some_instances(2), build_some_point2d(2)]); - store.insert_row(&bundle2).unwrap(); + let row2 = test_row!(ent_path @ [ + build_frame_nr(2.into()) + ] => 2; [build_some_instances(2), build_some_point2d(2)]); + store.insert_row(&row2).unwrap(); // Insert timelessly too! - let bundle2 = - test_bundle!(ent_path @ [] => [build_some_instances(2), build_some_point2d(2)]); - store.insert_row(&bundle2).unwrap(); + let row2 = test_row!(ent_path @ [] => 2; [build_some_instances(2), build_some_point2d(2)]); + store.insert_row(&row2).unwrap(); - let bundle3 = test_bundle!(ent_path @ [ - build_frame_nr(3.into()), build_log_time(Time::now()), - ] => [build_some_instances_from(25..29), build_some_point2d(4)]); - store.insert_row(&bundle3).unwrap(); + let row3 = test_row!(ent_path @ [ + build_frame_nr(3.into()), build_log_time(Time::now()), + ] => 4; [build_some_instances_from(25..29), build_some_point2d(4)]); + store.insert_row(&row3).unwrap(); // Insert timelessly too! - let bundle3 = test_bundle!(ent_path @ [] => [build_some_instances_from(25..29), build_some_point2d(4)]); - store.insert_row(&bundle3).unwrap(); + let row3 = test_row!(ent_path @ [] => 4; [build_some_instances_from(25..29), build_some_point2d(4)]); + store.insert_row(&row3).unwrap(); } for ent_path in &ent_paths { - let bundle4_1 = test_bundle!(ent_path @ [ - build_frame_nr(4.into()), build_log_time(Time::now()), - ] => [build_some_instances_from(20..23), build_some_rects(3)]); - store.insert_row(&bundle4_1).unwrap(); + let row4_1 = test_row!(ent_path @ [ + build_frame_nr(4.into()), build_log_time(Time::now()), + ] => 3; [build_some_instances_from(20..23), build_some_rects(3)]); + store.insert_row(&row4_1).unwrap(); - let bundle4_15 = test_bundle!(ent_path @ [ - build_frame_nr(4.into()), - ] => [build_some_instances_from(20..23), build_some_point2d(3)]); - store.insert_row(&bundle4_15).unwrap(); + let row4_15 = test_row!(ent_path @ [ + build_frame_nr(4.into()), + ] => 3; [build_some_instances_from(20..23), build_some_point2d(3)]); + store.insert_row(&row4_15).unwrap(); - let bundle4_2 = test_bundle!(ent_path @ [ - build_frame_nr(4.into()), build_log_time(Time::now()), - ] => [build_some_instances_from(25..28), build_some_rects(3)]); - store.insert_row(&bundle4_2).unwrap(); + let row4_2 = test_row!(ent_path @ [ + build_frame_nr(4.into()), build_log_time(Time::now()), + ] => 3; [build_some_instances_from(25..28), build_some_rects(3)]); + store.insert_row(&row4_2).unwrap(); - let bundle4_25 = test_bundle!(ent_path @ [ - build_frame_nr(4.into()), build_log_time(Time::now()), - ] => [build_some_instances_from(25..28), build_some_point2d(3)]); - store.insert_row(&bundle4_25).unwrap(); + let row4_25 = test_row!(ent_path @ [ + build_frame_nr(4.into()), build_log_time(Time::now()), + ] => 3; [build_some_instances_from(25..28), build_some_point2d(3)]); + store.insert_row(&row4_25).unwrap(); } let df = store.to_dataframe(); diff --git a/crates/re_arrow_store/examples/latest_component.rs b/crates/re_arrow_store/examples/latest_component.rs index bf35f99c3dcfc..ccfc7f5c86855 100644 --- a/crates/re_arrow_store/examples/latest_component.rs +++ b/crates/re_arrow_store/examples/latest_component.rs @@ -5,7 +5,7 @@ //! ``` use re_arrow_store::polars_util::latest_component; -use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline}; +use re_arrow_store::{test_row, DataStore, LatestAtQuery, TimeType, Timeline}; use re_log_types::component_types::Rect2D; use re_log_types::datagen::build_some_rects; use re_log_types::{ @@ -19,11 +19,11 @@ fn main() { let ent_path = EntityPath::from("my/entity"); - let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(2.into())] => 4; [build_some_rects(4)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(3.into())] => 2; [build_some_point2d(2)]); + store.insert_row(&row).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); diff --git a/crates/re_arrow_store/examples/latest_components.rs b/crates/re_arrow_store/examples/latest_components.rs index 1b78b54471524..89abf77719581 100644 --- a/crates/re_arrow_store/examples/latest_components.rs +++ b/crates/re_arrow_store/examples/latest_components.rs @@ -6,7 +6,7 @@ use polars_core::prelude::*; use re_arrow_store::polars_util::latest_components; -use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline}; +use re_arrow_store::{test_row, DataStore, LatestAtQuery, TimeType, Timeline}; use re_log_types::{ component_types::{InstanceKey, Point2D, Rect2D}, datagen::{build_frame_nr, build_some_point2d, build_some_rects}, @@ -18,11 +18,11 @@ fn main() { let ent_path = EntityPath::from("my/entity"); - let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(2.into())] => 4; [build_some_rects(4)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(3.into())] => 2; [build_some_point2d(2)]); + store.insert_row(&row).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); let df = latest_components( diff --git a/crates/re_arrow_store/examples/range_components.rs b/crates/re_arrow_store/examples/range_components.rs index 95e9cc27a8631..9d4130a1a534d 100644 --- a/crates/re_arrow_store/examples/range_components.rs +++ b/crates/re_arrow_store/examples/range_components.rs @@ -5,7 +5,7 @@ //! ``` use polars_core::prelude::JoinType; -use re_arrow_store::{polars_util, test_bundle, DataStore, RangeQuery, TimeRange}; +use re_arrow_store::{polars_util, test_row, DataStore, RangeQuery, TimeRange}; use re_log_types::{ component_types::{InstanceKey, Point2D, Rect2D}, datagen::{build_frame_nr, build_some_point2d, build_some_rects}, @@ -22,26 +22,26 @@ fn main() { let frame3 = 3.into(); let frame4 = 4.into(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame1)] => 2; [build_some_rects(2)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_point2d(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame2)] => 2; [build_some_point2d(2)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(4)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame3)] => 4; [build_some_point2d(4)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_rects(3)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(1)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 1; [build_some_point2d(1)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_rects(3)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(3)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_point2d(3)]); + store.insert_row(&row).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(2.into(), 4.into())); diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 944ff5f36e3c0..6a24abe2fff21 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -15,6 +15,8 @@ use crate::{ PersistentIndexTable, RowIndex, }; +// TODO: all of this stuff should be defined by Data{Cell,Row,Table}, not the store + // --- impl DataStore { diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index 67e032cc3381c..07fe02e1a5c0d 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -8,10 +8,8 @@ use parking_lot::RwLock; use re_log::{debug, trace}; use re_log_types::{ - component_types::InstanceKey, - msg_bundle::{wrap_in_listarray, MsgBundle}, - ComponentName, DataCell, DataCellError, EntityPath, MsgId, TimeInt, TimePoint, TimeRange, - Timeline, + component_types::InstanceKey, Component, ComponentName, DataCell, DataCellError, DataRow, + DataTable, EntityPath, MsgId, TimeInt, TimePoint, TimeRange, Timeline, }; use crate::{ @@ -28,7 +26,7 @@ use crate::{ #[derive(thiserror::Error, Debug)] pub enum WriteError { - #[error("Error with the underlying data cell")] + #[error("Error with one or more the underlying data cells")] DataCell(#[from] DataCellError), // Clustering key @@ -37,23 +35,10 @@ pub enum WriteError { #[error( "The cluster component must be increasingly sorted and not contain \ - any duplicates, got {0:?}" + any duplicates, got {0:?}" )] InvalidClusteringComponent(DataCell), - // Instances - #[error( - "All components within a row must have the same number of instances as the \ - cluster component, got {cluster_comp}={cluster_comp_nb_instances} vs. \ - {key}={num_instances}" - )] - MismatchedInstances { - cluster_comp: ComponentName, - cluster_comp_nb_instances: u32, - key: ComponentName, - num_instances: u32, - }, - // Misc #[error("Other error")] Other(#[from] anyhow::Error), @@ -62,6 +47,18 @@ pub enum WriteError { pub type WriteResult = ::std::result::Result; impl DataStore { + pub fn insert_table(&mut self, table: &DataTable) -> WriteResult<()> { + // TODO: explain that the magic of batching is in how the data is all in the same place + // TODO: could use some coalescing server-side too? + for row in table.as_rows() { + // TODO: should we fail? + self.insert_row(&row)?; + } + + Ok(()) + } + + // TODO: update /// Inserts a [`MsgBundle`]'s worth of components into the datastore. /// /// * All components within a single row must share the same number of instances. @@ -69,41 +66,31 @@ impl DataStore { /// If the bundle doesn't carry a payload for the cluster key, one will be auto-generated /// based on the length of the components in the payload, in the form of an array of /// monotonically increasing u64s going from `0` to `N-1`. - pub fn insert_row(&mut self, msg: &MsgBundle) -> WriteResult<()> { + pub fn insert_row(&mut self, row: &DataRow) -> WriteResult<()> { // TODO(cmc): kind & insert_id need to somehow propagate through the span system. self.insert_id += 1; - let MsgBundle { - msg_id, - entity_path: ent_path, - time_point, - cells, - } = msg; - - if cells.is_empty() { + if row.num_components() == 0 { return Ok(()); } crate::profile_function!(); - let ent_path_hash = ent_path.hash(); + let DataRow { + row_id, + timepoint, + entity_path: ent_path, + num_instances: _, + cells, + } = row; - // TODO(#1619): remove this thing when removing MsgBundle. - // - // Effectively the same thing as having a non-unit length batch, except it's really not - // worth more than an assertion since: - // - A) `MsgBundle` should already guarantee this - // - B) this limitation should be gone soon enough - debug_assert!( - msg.cells.iter().map(|cell| cell.component()).all_unique(), - "cannot insert same component multiple times, this is equivalent to multiple rows", - ); + let ent_path_hash = ent_path.hash(); trace!( kind = "insert", id = self.insert_id, cluster_key = %self.cluster_key, - timelines = ?time_point.iter() + timelines = ?timepoint.iter() .map(|(timeline, time)| (timeline.name(), timeline.typ().format(*time))) .collect::>(), entity = %ent_path, @@ -111,15 +98,15 @@ impl DataStore { "insertion started..." ); - let cluster_comp_pos = cells + let cluster_cell_pos = cells .iter() .find_position(|cell| cell.component() == self.cluster_key) .map(|(pos, _)| pos); - if time_point.is_timeless() { + if timepoint.is_timeless() { let mut row_indices = IntMap::default(); - self._insert_timeless_row(cluster_comp_pos, cells, &mut row_indices)?; + self._insert_timeless_row(cluster_cell_pos, cells, &mut row_indices)?; let index = self .timeless_indices @@ -129,9 +116,9 @@ impl DataStore { } else { let mut row_indices = IntMap::default(); - self._insert_row(time_point, cluster_comp_pos, cells, &mut row_indices)?; + self._insert_row(timepoint, cluster_cell_pos, cells, &mut row_indices)?; - for (timeline, time) in time_point.iter() { + for (timeline, time) in timepoint.iter() { let ent_path = ent_path.clone(); // shallow let index = self .indices @@ -141,22 +128,22 @@ impl DataStore { } } - // This is valuable information, even for a timeless ! - self.messages.insert(*msg_id, time_point.clone()); + // This is valuable information, even for a timeless timepoint! + self.messages.insert(*row_id, timepoint.clone()); Ok(()) } fn _insert_timeless_row( &mut self, - cluster_comp_pos: Option, + cluster_cell_pos: Option, cells: &[DataCell], row_indices: &mut IntMap, ) -> WriteResult<()> { crate::profile_function!(); - let (cluster_row_idx, cluster_len) = - self.get_or_create_cluster_component(cluster_comp_pos, cells, &TimePoint::default())?; + let cluster_row_idx = + self.get_or_create_cluster_component(cluster_cell_pos, cells, &TimePoint::default())?; // Always insert the cluster component. row_indices.insert(self.cluster_key, cluster_row_idx); @@ -179,16 +166,6 @@ impl DataStore { .filter(|cell| cell.component() != self.cluster_key) { let component = cell.component(); - let num_instances = cell.num_instances(); - - if num_instances != cluster_len { - return Err(WriteError::MismatchedInstances { - cluster_comp: self.cluster_key, - cluster_comp_nb_instances: cluster_len, - key: component, - num_instances, - }); - } let table = self .timeless_components @@ -205,14 +182,14 @@ impl DataStore { fn _insert_row( &mut self, time_point: &TimePoint, - cluster_comp_pos: Option, + cluster_cell_pos: Option, cells: &[DataCell], row_indices: &mut IntMap, ) -> WriteResult<()> { crate::profile_function!(); - let (cluster_row_idx, cluster_len) = - self.get_or_create_cluster_component(cluster_comp_pos, cells, time_point)?; + let cluster_row_idx = + self.get_or_create_cluster_component(cluster_cell_pos, cells, time_point)?; // Always insert the cluster component. row_indices.insert(self.cluster_key, cluster_row_idx); @@ -235,16 +212,6 @@ impl DataStore { .filter(|cell| cell.component() != self.cluster_key) { let component = cell.component(); - let num_instances = cell.num_instances(); - - if num_instances != cluster_len { - return Err(WriteError::MismatchedInstances { - cluster_comp: self.cluster_key, - cluster_comp_nb_instances: cluster_len, - key: component, - num_instances, - }); - } let table = self .components @@ -266,10 +233,10 @@ impl DataStore { /// deduplication. fn get_or_create_cluster_component( &mut self, - cluster_comp_pos: Option, + cluster_cell_pos: Option, cells: &[DataCell], time_point: &TimePoint, - ) -> WriteResult<(RowIndex, u32)> { + ) -> WriteResult { crate::profile_function!(); enum ClusterData<'a> { @@ -278,11 +245,11 @@ impl DataStore { UserData(&'a DataCell), } - let (cluster_len, cluster_data) = if let Some(cluster_comp_pos) = cluster_comp_pos { + let (cluster_len, cluster_data) = if let Some(cluster_cell_pos) = cluster_cell_pos { // We found a component with a name matching the cluster key's, let's make sure it's // valid (dense, sorted, no duplicates) and use that if so. - let cluster_cell = &cells[cluster_comp_pos]; + let cluster_cell = &cells[cluster_cell_pos]; // Clustering component must be dense. if !cluster_cell.is_dense() { @@ -329,7 +296,7 @@ impl DataStore { }; match cluster_data { - ClusterData::Cached(row_idx) => Ok((row_idx, cluster_len)), + ClusterData::Cached(row_idx) => Ok(row_idx), ClusterData::GenData(cell) => { // We had to generate a cluster component of the given length for the first time, // let's store it forever. @@ -344,7 +311,7 @@ impl DataStore { self.cluster_comp_cache.insert(cluster_len, row_idx); - Ok((row_idx, cluster_len)) + Ok(row_idx) } ClusterData::UserData(cell) => { // If we didn't hit the cache, then we have to insert this cluster component in @@ -366,7 +333,7 @@ impl DataStore { table.push_cell(&self.config, time_point, cell) }; - Ok((row_idx, cluster_len)) + Ok(row_idx) } } } @@ -917,8 +884,7 @@ impl PersistentComponentTable { /// `datatype` must be the type of the component itself, devoid of any wrapping layers /// (i.e. _not_ a `ListArray<...>`!). fn new(name: ComponentName, datatype: &DataType) -> Self { - // TODO(cmc): think about this when implementing deletion. - let chunks = vec![wrap_in_listarray(new_empty_array(datatype.clone())).to_boxed()]; + let chunks = vec![DataCell::from_arrow_empty(name, datatype.clone()).as_arrow_monolist()]; let total_rows = chunks.iter().map(|values| values.len() as u64).sum(); let total_size_bytes = chunks .iter() @@ -1059,8 +1025,9 @@ impl ComponentBucket { pub fn new(name: ComponentName, datatype: &DataType, row_offset: u64) -> Self { // If this is the first bucket of this table, we need to insert an empty list at // row index #0! + // TODO(cmc): this needs to go let chunks = if row_offset == 0 { - vec![wrap_in_listarray(new_empty_array(datatype.clone())).to_boxed()] + vec![DataCell::from_arrow_empty(name, datatype.clone()).as_arrow_monolist()] } else { vec![] }; diff --git a/crates/re_arrow_store/src/test_util.rs b/crates/re_arrow_store/src/test_util.rs index b739d795fd508..870624782a2cf 100644 --- a/crates/re_arrow_store/src/test_util.rs +++ b/crates/re_arrow_store/src/test_util.rs @@ -4,24 +4,24 @@ use crate::DataStoreConfig; #[doc(hidden)] #[macro_export] -macro_rules! test_bundle { - ($entity:ident @ $frames:tt => [$c0:expr $(,)*]) => { - ::re_log_types::msg_bundle::try_build_msg_bundle1( +macro_rules! test_row { + ($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => { + ::re_log_types::DataRow::from_cells1( ::re_log_types::MsgId::random(), $entity.clone(), $frames, + $n, $c0, ) - .unwrap() }; - ($entity:ident @ $frames:tt => [$c0:expr, $c1:expr $(,)*]) => { - re_log_types::msg_bundle::try_build_msg_bundle2( + ($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => { + ::re_log_types::DataRow::from_cells2( ::re_log_types::MsgId::random(), $entity.clone(), $frames, + $n, ($c0, $c1), ) - .unwrap() }; } diff --git a/crates/re_arrow_store/tests/correctness.rs b/crates/re_arrow_store/tests/correctness.rs index d73ed083bbb0b..74ec6a8a76409 100644 --- a/crates/re_arrow_store/tests/correctness.rs +++ b/crates/re_arrow_store/tests/correctness.rs @@ -7,7 +7,7 @@ use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; use rand::Rng; use re_arrow_store::{ - test_bundle, DataStore, DataStoreConfig, GarbageCollectionTarget, LatestAtQuery, WriteError, + test_row, DataStore, DataStoreConfig, GarbageCollectionTarget, LatestAtQuery, WriteError, }; use re_log_types::{ component_types::InstanceKey, @@ -32,12 +32,12 @@ fn write_errors() { } let mut store = DataStore::new(InstanceKey::name(), Default::default()); - let bundle = test_bundle!(ent_path @ - [build_frame_nr(32.into()), build_log_time(Time::now())] => [ + let row = test_row!(ent_path @ + [build_frame_nr(32.into()), build_log_time(Time::now())] => 3; [ build_sparse_instances(), build_some_point2d(3) ]); assert!(matches!( - store.insert_row(&bundle), + store.insert_row(&row), Err(WriteError::SparseClusteringComponent(_)), )); } @@ -53,38 +53,26 @@ fn write_errors() { let mut store = DataStore::new(InstanceKey::name(), Default::default()); { - let bundle = test_bundle!(ent_path @ - [build_frame_nr(32.into()), build_log_time(Time::now())] => [ + let row = test_row!(ent_path @ + [build_frame_nr(32.into()), build_log_time(Time::now())] => 3; [ build_unsorted_instances(), build_some_point2d(3) ]); assert!(matches!( - store.insert_row(&bundle), + store.insert_row(&row), Err(WriteError::InvalidClusteringComponent(_)), )); } { - let bundle = test_bundle!(ent_path @ - [build_frame_nr(32.into()), build_log_time(Time::now())] => [ + let row = test_row!(ent_path @ + [build_frame_nr(32.into()), build_log_time(Time::now())] => 3; [ build_duped_instances(), build_some_point2d(3) ]); assert!(matches!( - store.insert_row(&bundle), + store.insert_row(&row), Err(WriteError::InvalidClusteringComponent(_)), )); } } - - { - let mut store = DataStore::new(InstanceKey::name(), Default::default()); - let bundle = test_bundle!(ent_path @ - [build_frame_nr(32.into()), build_log_time(Time::now())] => [ - build_some_instances(4), build_some_point2d(3) - ]); - assert!(matches!( - store.insert_row(&bundle), - Err(WriteError::MismatchedInstances { .. }), - )); - } } // --- @@ -109,11 +97,9 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) { let num_instances = 3; store - .insert_row( - &test_bundle!(ent_path @ [build_log_time(now), build_frame_nr(frame40)] => [ - build_some_instances(num_instances), - ]), - ) + .insert_row(&test_row!(ent_path @ [ + build_log_time(now), build_frame_nr(frame40), + ] => num_instances; [build_some_instances(num_instances as _)])) .unwrap(); if let err @ Err(_) = store.sanity_check() { @@ -246,9 +232,9 @@ fn range_join_across_single_row_impl(store: &mut DataStore) { let points = build_some_point2d(3); let colors = build_some_colors(3); - let bundle = - test_bundle!(ent_path @ [build_frame_nr(42.into())] => [points.clone(), colors.clone()]); - store.insert_row(&bundle).unwrap(); + let row = + test_row!(ent_path @ [build_frame_nr(42.into())] => 3; [points.clone(), colors.clone()]); + store.insert_row(&row).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); let query = re_arrow_store::RangeQuery::new( @@ -310,10 +296,12 @@ fn gc_correct() { for i in 0..num_ents { let ent_path = EntityPath::from(format!("this/that/{i}")); let num_instances = rng.gen_range(0..=1_000); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame_nr.into())] => [ - build_some_colors(num_instances), + let row = test_row!(ent_path @ [ + build_frame_nr(frame_nr.into()), + ] => num_instances; [ + build_some_colors(num_instances as _), ]); - store.insert_row(&bundle).unwrap(); + store.insert_row(&row).unwrap(); } } diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 018482263d831..b07f0c81c174b 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -6,12 +6,14 @@ use std::sync::atomic::{AtomicBool, Ordering}; +use arrow2::array::{Array, UInt64Array}; +use itertools::Itertools as _; use nohash_hasher::IntMap; use polars_core::{prelude::*, series::Series}; use polars_ops::prelude::DataFrameJoinOps; use rand::Rng; use re_arrow_store::{ - polars_util, test_bundle, DataStore, DataStoreConfig, GarbageCollectionTarget, LatestAtQuery, + polars_util, test_row, DataStore, DataStoreConfig, GarbageCollectionTarget, LatestAtQuery, RangeQuery, TimeInt, TimeRange, }; use re_log_types::{ @@ -21,10 +23,12 @@ use re_log_types::{ build_some_point2d, build_some_rects, }, external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, - msg_bundle::MsgBundle, - Component as _, ComponentName, DataCell, EntityPath, MsgId, TimeType, Timeline, + Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, MsgId, TimeType, + Timeline, }; +// TODO(cmc): introduce batching in the testing matrix + // --- LatestComponentsAt --- #[test] @@ -79,7 +83,7 @@ fn all_components() { ColorRGBA::name(), // added by us, timeless Rect2D::name(), // added by us cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; @@ -89,25 +93,23 @@ fn all_components() { Point2D::name(), // added by us Rect2D::name(), // added by us cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; - let bundle = test_bundle!(ent_path @ [] => [build_some_colors(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [] => 2; [build_some_colors(2)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [ - build_frame_nr(frame1), - ] => [build_some_rects(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame1)] => 2; [build_some_rects(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [ + let row = test_row!(ent_path @ [ build_frame_nr(frame2), - ] => [build_some_rects(2), build_some_point2d(2)]); - store.insert_row(&bundle).unwrap(); + ] => 2; [build_some_rects(2), build_some_point2d(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_b)); @@ -147,7 +149,7 @@ fn all_components() { ColorRGBA::name(), // added by us, timeless Rect2D::name(), // added by us cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; @@ -157,26 +159,26 @@ fn all_components() { Rect2D::name(), // ⚠ inherited before the buckets got split apart! Point2D::name(), // added by us cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; - let bundle = test_bundle!(ent_path @ [] => [build_some_colors(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [] => 2; [build_some_colors(2)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame1)] => 2; [build_some_rects(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_instances(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame2)] => 2; [build_some_instances(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame3)] => 2; [build_some_point2d(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_b)); @@ -219,7 +221,7 @@ fn all_components() { ColorRGBA::name(), // added by us, timeless Rect2D::name(), // added by us cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; @@ -229,31 +231,31 @@ fn all_components() { Point2D::name(), // added by us but not contained in the second bucket Rect2D::name(), // added by use cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; - let bundle = test_bundle!(ent_path @ [] => [build_some_colors(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [] => 2; [build_some_colors(2)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_rects(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame2)] => 2; [build_some_rects(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_rects(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame3)] => 2; [build_some_rects(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 2; [build_some_rects(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_point2d(2)]); - store.insert_row(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame1)] => 2; [build_some_point2d(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_b)); @@ -294,33 +296,32 @@ fn latest_at_impl(store: &mut DataStore) { let frame3: TimeInt = 3.into(); let frame4: TimeInt = 4.into(); - // helper to insert a bundle both as a temporal and timeless payload - let insert = |store: &mut DataStore, bundle| { + // helper to insert a row both as a temporal and timeless payload + let insert = |store: &mut DataStore, row| { // insert temporal - store.insert_row(bundle).unwrap(); + store.insert_row(row).unwrap(); // insert timeless - let mut bundle_timeless = bundle.clone(); - bundle_timeless.time_point = Default::default(); - store.insert_row(&bundle_timeless).unwrap(); + let mut row_timeless = (*row).clone(); + row_timeless.timepoint = Default::default(); + store.insert_row(&row_timeless).unwrap(); }; let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); - let bundle1 = - test_bundle!(ent_path @ [build_frame_nr(frame1)] => [instances1.clone(), colors1]); - insert(store, &bundle1); + let row1 = test_row!(ent_path @ [build_frame_nr(frame1)] => 3; [instances1.clone(), colors1]); + insert(store, &row1); let points2 = build_some_point2d(3); - let bundle2 = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [instances1, points2]); - insert(store, &bundle2); + let row2 = test_row!(ent_path @ [build_frame_nr(frame2)] => 3; [instances1, points2]); + insert(store, &row2); let points3 = build_some_point2d(10); - let bundle3 = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [points3]); - insert(store, &bundle3); + let row3 = test_row!(ent_path @ [build_frame_nr(frame3)] => 10; [points3]); + insert(store, &row3); let colors4 = build_some_colors(5); - let bundle4 = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [colors4]); - insert(store, &bundle4); + let row4 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [colors4]); + insert(store, &row4); if let err @ Err(_) = store.sanity_check() { store.sort_indices_if_needed(); @@ -328,50 +329,49 @@ fn latest_at_impl(store: &mut DataStore) { err.unwrap(); } - let mut assert_latest_components = - |frame_nr: TimeInt, bundles: &[(ComponentName, &MsgBundle)]| { - let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); - let components_all = &[ColorRGBA::name(), Point2D::name()]; + let mut assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, &DataRow)]| { + let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); + let components_all = &[ColorRGBA::name(), Point2D::name()]; - let df = polars_util::latest_components( - store, - &LatestAtQuery::new(timeline_frame_nr, frame_nr), - &ent_path, - components_all, - &JoinType::Outer, - ) - .unwrap(); + let df = polars_util::latest_components( + store, + &LatestAtQuery::new(timeline_frame_nr, frame_nr), + &ent_path, + components_all, + &JoinType::Outer, + ) + .unwrap(); - let df_expected = joint_df(store.cluster_key(), bundles); + let df_expected = joint_df(store.cluster_key(), rows); - store.sort_indices_if_needed(); - assert_eq!(df_expected, df, "{store}"); - }; + store.sort_indices_if_needed(); + assert_eq!(df_expected, df, "{store}"); + }; // TODO(cmc): bring back some log_time scenarios assert_latest_components( frame0, - &[(ColorRGBA::name(), &bundle4), (Point2D::name(), &bundle3)], // timeless + &[(ColorRGBA::name(), &row4), (Point2D::name(), &row3)], // timeless ); assert_latest_components( frame1, &[ - (ColorRGBA::name(), &bundle1), - (Point2D::name(), &bundle3), // timeless + (ColorRGBA::name(), &row1), + (Point2D::name(), &row3), // timeless ], ); assert_latest_components( frame2, - &[(ColorRGBA::name(), &bundle1), (Point2D::name(), &bundle2)], + &[(ColorRGBA::name(), &row1), (Point2D::name(), &row2)], ); assert_latest_components( frame3, - &[(ColorRGBA::name(), &bundle1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row1), (Point2D::name(), &row3)], ); assert_latest_components( frame4, - &[(ColorRGBA::name(), &bundle4), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4), (Point2D::name(), &row3)], ); } @@ -399,54 +399,52 @@ fn range_impl(store: &mut DataStore) { let frame4: TimeInt = 4.into(); let frame5: TimeInt = 5.into(); - // helper to insert a bundle both as a temporal and timeless payload - let insert = |store: &mut DataStore, bundle| { + // helper to insert a row both as a temporal and timeless payload + let insert = |store: &mut DataStore, row| { // insert temporal - store.insert_row(bundle).unwrap(); + store.insert_row(row).unwrap(); // insert timeless - let mut bundle_timeless = bundle.clone(); - bundle_timeless.time_point = Default::default(); - store.insert_row(&bundle_timeless).unwrap(); + let mut row_timeless = (*row).clone(); + row_timeless.timepoint = Default::default(); + store.insert_row(&row_timeless).unwrap(); }; let insts1 = build_some_instances(3); let colors1 = build_some_colors(3); - let bundle1 = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [insts1.clone(), colors1]); - insert(store, &bundle1); + let row1 = test_row!(ent_path @ [build_frame_nr(frame1)] => 3; [insts1.clone(), colors1]); + insert(store, &row1); let points2 = build_some_point2d(3); - let bundle2 = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [insts1, points2]); - insert(store, &bundle2); + let row2 = test_row!(ent_path @ [build_frame_nr(frame2)] => 3; [insts1, points2]); + insert(store, &row2); let points3 = build_some_point2d(10); - let bundle3 = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [points3]); - insert(store, &bundle3); + let row3 = test_row!(ent_path @ [build_frame_nr(frame3)] => 10; [points3]); + insert(store, &row3); let insts4_1 = build_some_instances_from(20..25); let colors4_1 = build_some_colors(5); - let bundle4_1 = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [insts4_1, colors4_1]); - insert(store, &bundle4_1); + let row4_1 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [insts4_1, colors4_1]); + insert(store, &row4_1); let insts4_2 = build_some_instances_from(25..30); let colors4_2 = build_some_colors(5); - let bundle4_2 = - test_bundle!(ent_path @ [build_frame_nr(frame4)] => [insts4_2.clone(), colors4_2]); - insert(store, &bundle4_2); + let row4_2 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [insts4_2.clone(), colors4_2]); + insert(store, &row4_2); let points4_25 = build_some_point2d(5); - let bundle4_25 = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [insts4_2, points4_25]); - insert(store, &bundle4_25); + let row4_25 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [insts4_2, points4_25]); + insert(store, &row4_25); let insts4_3 = build_some_instances_from(30..35); let colors4_3 = build_some_colors(5); - let bundle4_3 = - test_bundle!(ent_path @ [build_frame_nr(frame4)] => [insts4_3.clone(), colors4_3]); - insert(store, &bundle4_3); + let row4_3 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [insts4_3.clone(), colors4_3]); + insert(store, &row4_3); let points4_4 = build_some_point2d(5); - let bundle4_4 = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [insts4_3, points4_4]); - insert(store, &bundle4_4); + let row4_4 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [insts4_3, points4_4]); + insert(store, &row4_4); if let err @ Err(_) = store.sanity_check() { store.sort_indices_if_needed(); @@ -454,7 +452,7 @@ fn range_impl(store: &mut DataStore) { err.unwrap(); } - // Each entry in `bundles_at_times` corresponds to a dataframe that's expected to be returned + // Each entry in `rows_at_times` corresponds to a dataframe that's expected to be returned // by the range query. // A single timepoint might have several of those! That's one of the behaviors specific to // range queries. @@ -462,16 +460,16 @@ fn range_impl(store: &mut DataStore) { let mut assert_range_components = |time_range: TimeRange, components: [ComponentName; 2], - bundles_at_times: &[(Option, &[(ComponentName, &MsgBundle)])]| { + rows_at_times: &[(Option, &[(ComponentName, &DataRow)])]| { let mut expected_timeless = Vec::::new(); let mut expected_at_times: IntMap> = Default::default(); - for (time, bundles) in bundles_at_times { + for (time, rows) in rows_at_times { if let Some(time) = time { let dfs = expected_at_times.entry(*time).or_default(); - dfs.push(joint_df(store.cluster_key(), bundles)); + dfs.push(joint_df(store.cluster_key(), rows)); } else { - expected_timeless.push(joint_df(store.cluster_key(), bundles)); + expected_timeless.push(joint_df(store.cluster_key(), rows)); } } @@ -510,7 +508,7 @@ fn range_impl(store: &mut DataStore) { dfs_processed += 1; } - let dfs_processed_expected = bundles_at_times.len(); + let dfs_processed_expected = rows_at_times.len(); assert_eq!(dfs_processed_expected, dfs_processed); }; @@ -524,16 +522,13 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame0), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_4), - ], + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_4)], ), // timeless ( Some(frame1), &[ - (ColorRGBA::name(), &bundle1), - (Point2D::name(), &bundle4_4), // timeless + (ColorRGBA::name(), &row1), + (Point2D::name(), &row4_4), // timeless ], ), ], @@ -545,8 +540,8 @@ fn range_impl(store: &mut DataStore) { ( Some(frame1), &[ - (ColorRGBA::name(), &bundle1), - (Point2D::name(), &bundle4_4), // timeless + (ColorRGBA::name(), &row1), + (Point2D::name(), &row4_4), // timeless ], ), // ], @@ -557,7 +552,7 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame2), - &[(ColorRGBA::name(), &bundle1), (Point2D::name(), &bundle2)], + &[(ColorRGBA::name(), &row1), (Point2D::name(), &row2)], ), // ], ); @@ -567,22 +562,19 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame3), - &[(ColorRGBA::name(), &bundle1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row1), (Point2D::name(), &row3)], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_1), (Point2D::name(), &row3)], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_2), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_2), (Point2D::name(), &row3)], ), ( Some(frame4), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_25), - ], // !!! + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_25)], // !!! ), ], ); @@ -592,10 +584,7 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame4), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_4), - ], // !!! + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_4)], // !!! ), // ], ); @@ -608,10 +597,7 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame0), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), // timeless ], ); @@ -622,13 +608,13 @@ fn range_impl(store: &mut DataStore) { ( Some(frame1), &[ - (Point2D::name(), &bundle4_4), // timeless - (ColorRGBA::name(), &bundle1), + (Point2D::name(), &row4_4), // timeless + (ColorRGBA::name(), &row1), ], ), ( Some(frame2), - &[(Point2D::name(), &bundle2), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row2), (ColorRGBA::name(), &row1)], ), // ], ); @@ -638,11 +624,11 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame2), - &[(Point2D::name(), &bundle2), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row2), (ColorRGBA::name(), &row1)], ), ( Some(frame3), - &[(Point2D::name(), &bundle3), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row3), (ColorRGBA::name(), &row1)], ), ], ); @@ -652,21 +638,15 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame3), - &[(Point2D::name(), &bundle3), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row3), (ColorRGBA::name(), &row1)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_25), - (ColorRGBA::name(), &bundle4_2), - ], + &[(Point2D::name(), &row4_25), (ColorRGBA::name(), &row4_2)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), ], ); @@ -676,10 +656,7 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame4), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), // ], ); @@ -692,32 +669,26 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame0), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_4), - ], + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_4)], ), // timeless ( Some(frame1), &[ - (ColorRGBA::name(), &bundle1), - (Point2D::name(), &bundle4_4), // timeless + (ColorRGBA::name(), &row1), + (Point2D::name(), &row4_4), // timeless ], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_1), (Point2D::name(), &row3)], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_2), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_2), (Point2D::name(), &row3)], ), ( Some(frame4), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_25), - ], // !!! + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_25)], // !!! ), ], ); @@ -730,32 +701,23 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame0), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), // timeless ( Some(frame2), - &[(Point2D::name(), &bundle2), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row2), (ColorRGBA::name(), &row1)], ), ( Some(frame3), - &[(Point2D::name(), &bundle3), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row3), (ColorRGBA::name(), &row1)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_25), - (ColorRGBA::name(), &bundle4_2), - ], + &[(Point2D::name(), &row4_25), (ColorRGBA::name(), &row4_2)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), ], ); @@ -766,43 +728,37 @@ fn range_impl(store: &mut DataStore) { TimeRange::new(TimeInt::MIN, TimeInt::MAX), [ColorRGBA::name(), Point2D::name()], &[ - (None, &[(ColorRGBA::name(), &bundle1)]), + (None, &[(ColorRGBA::name(), &row1)]), ( None, - &[(ColorRGBA::name(), &bundle4_1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_1), (Point2D::name(), &row3)], ), ( None, - &[(ColorRGBA::name(), &bundle4_2), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_2), (Point2D::name(), &row3)], ), ( None, - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_25), - ], // !!! + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_25)], // !!! ), ( Some(frame1), &[ - (ColorRGBA::name(), &bundle1), - (Point2D::name(), &bundle4_4), // timeless + (ColorRGBA::name(), &row1), + (Point2D::name(), &row4_4), // timeless ], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_1), (Point2D::name(), &row3)], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_2), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_2), (Point2D::name(), &row3)], ), ( Some(frame4), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_25), - ], // !!! + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_25)], // !!! ), ], ); @@ -815,47 +771,35 @@ fn range_impl(store: &mut DataStore) { &[ ( None, - &[(Point2D::name(), &bundle2), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row2), (ColorRGBA::name(), &row1)], ), ( None, - &[(Point2D::name(), &bundle3), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row3), (ColorRGBA::name(), &row1)], ), ( None, - &[ - (Point2D::name(), &bundle4_25), - (ColorRGBA::name(), &bundle4_2), - ], + &[(Point2D::name(), &row4_25), (ColorRGBA::name(), &row4_2)], ), ( None, - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), ( Some(frame2), - &[(Point2D::name(), &bundle2), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row2), (ColorRGBA::name(), &row1)], ), ( Some(frame3), - &[(Point2D::name(), &bundle3), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row3), (ColorRGBA::name(), &row1)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_25), - (ColorRGBA::name(), &bundle4_2), - ], + &[(Point2D::name(), &row4_25), (ColorRGBA::name(), &row4_2)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), ], ); @@ -863,17 +807,16 @@ fn range_impl(store: &mut DataStore) { // --- Common helpers --- -/// Given a list of bundles, crafts a `latest_components`-looking dataframe. -// TODO(#1692): use Data{Cell,Row,Table} polars extensions -fn joint_df(cluster_key: ComponentName, bundles: &[(ComponentName, &MsgBundle)]) -> DataFrame { - let df = bundles +/// Given a list of rows, crafts a `latest_components`-looking dataframe. +fn joint_df(cluster_key: ComponentName, rows: &[(ComponentName, &DataRow)]) -> DataFrame { + let df = rows .iter() - .map(|(component, bundle)| { - let cluster_comp = if let Some(idx) = bundle.find_component(&cluster_key) { - Series::try_from((cluster_key.as_str(), bundle.cells[idx].as_arrow_monolist())) + .map(|(component, row)| { + let cluster_comp = if let Some(idx) = row.find_cell(&cluster_key) { + Series::try_from((cluster_key.as_str(), row.cells[idx].as_arrow_monolist())) .unwrap() } else { - let num_instances = bundle.num_instances(); + let num_instances = row.num_instances(); Series::try_from(( cluster_key.as_str(), DataCell::from_component::(0..num_instances as u64) @@ -882,14 +825,11 @@ fn joint_df(cluster_key: ComponentName, bundles: &[(ComponentName, &MsgBundle)]) .unwrap() }; - let comp_idx = bundle.find_component(component).unwrap(); + let comp_idx = row.find_cell(component).unwrap(); let df = DataFrame::new(vec![ cluster_comp, - Series::try_from(( - component.as_str(), - bundle.cells[comp_idx].as_arrow_monolist(), - )) - .unwrap(), + Series::try_from((component.as_str(), row.cells[comp_idx].as_arrow_monolist())) + .unwrap(), ]) .unwrap(); @@ -930,10 +870,12 @@ fn gc_impl(store: &mut DataStore) { let frames = (0..num_frames).filter(|_| rand::thread_rng().gen()); for frame_nr in frames { let num_instances = rng.gen_range(0..=1_000); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame_nr.into())] => [ - build_some_rects(num_instances), + let row = test_row!(ent_path @ [ + build_frame_nr(frame_nr.into()) + ] => num_instances; [ + build_some_rects(num_instances as _), ]); - store.insert_row(&bundle).unwrap(); + store.insert_row(&row).unwrap(); } } diff --git a/crates/re_arrow_store/tests/internals.rs b/crates/re_arrow_store/tests/internals.rs index 1307578781e46..b81e09600a3d7 100644 --- a/crates/re_arrow_store/tests/internals.rs +++ b/crates/re_arrow_store/tests/internals.rs @@ -8,8 +8,7 @@ use re_arrow_store::{DataStore, DataStoreConfig}; use re_log_types::{ component_types::InstanceKey, datagen::{build_frame_nr, build_some_instances}, - msg_bundle::MsgBundle, - Component as _, EntityPath, MsgId, TimePoint, + Component as _, DataRow, EntityPath, MsgId, TimePoint, }; // --- Internals --- @@ -51,23 +50,25 @@ fn pathological_bucket_topology() { let ent_path = EntityPath::from("this/that"); let num_instances = 1; - let time_point = TimePoint::from([build_frame_nr(frame_nr.into())]); + let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]); for _ in 0..num { - let msg = MsgBundle::new( + let row = DataRow::from_cells1( MsgId::ZERO, ent_path.clone(), - time_point.clone(), - vec![build_some_instances(num_instances).try_into().unwrap()], + timepoint.clone(), + num_instances, + build_some_instances(num_instances as _), ); - store_forward.insert_row(&msg).unwrap(); + store_forward.insert_row(&row).unwrap(); - let msg = MsgBundle::new( + let row = DataRow::from_cells1( MsgId::ZERO, ent_path.clone(), - time_point.clone(), - vec![build_some_instances(num_instances).try_into().unwrap()], + timepoint.clone(), + num_instances, + build_some_instances(num_instances as _), ); - store_backward.insert_row(&msg).unwrap(); + store_backward.insert_row(&row).unwrap(); } } @@ -79,24 +80,26 @@ fn pathological_bucket_topology() { let ent_path = EntityPath::from("this/that"); let num_instances = 1; - let msgs = range + // TODO(cmc): to update once batching lands + let rows = range .map(|frame_nr| { - let time_point = TimePoint::from([build_frame_nr(frame_nr.into())]); - MsgBundle::new( + let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]); + DataRow::from_cells1( MsgId::ZERO, ent_path.clone(), - time_point, - vec![build_some_instances(num_instances).try_into().unwrap()], + timepoint, + num_instances, + build_some_instances(num_instances as _), ) }) .collect::>(); - msgs.iter() - .for_each(|msg| store_forward.insert_row(msg).unwrap()); + rows.iter() + .for_each(|row| store_forward.insert_row(row).unwrap()); - msgs.iter() + rows.iter() .rev() - .for_each(|msg| store_backward.insert_row(msg).unwrap()); + .for_each(|row| store_backward.insert_row(row).unwrap()); } store_repeated_frame(1000, 10, &mut store_forward, &mut store_backward); diff --git a/crates/re_data_store/examples/memory_usage.rs b/crates/re_data_store/examples/memory_usage.rs index 29a8386e3dc48..9b023d4203667 100644 --- a/crates/re_data_store/examples/memory_usage.rs +++ b/crates/re_data_store/examples/memory_usage.rs @@ -48,7 +48,7 @@ fn live_bytes() -> usize { // ---------------------------------------------------------------------------- -use re_log_types::{entity_path, MsgId}; +use re_log_types::{entity_path, DataRow, DataTable, MsgId}; fn main() { log_messages(); @@ -57,7 +57,6 @@ fn main() { fn log_messages() { use re_log_types::{ datagen::{build_frame_nr, build_some_point2d}, - msg_bundle::try_build_msg_bundle1, ArrowMsg, LogMsg, TimeInt, TimePoint, Timeline, }; @@ -107,13 +106,17 @@ fn log_messages() { { let used_bytes_start = live_bytes(); let msg_bundle = Box::new( - try_build_msg_bundle1( - MsgId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - build_some_point2d(1), + DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells1( + MsgId::random(), + entity_path!("points"), + [build_frame_nr(0.into())], + 1, + build_some_point2d(1), + )], ) - .unwrap(), + .into_msg_bundle(), ); let msg_bundle_bytes = live_bytes() - used_bytes_start; let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap())); @@ -129,13 +132,17 @@ fn log_messages() { { let used_bytes_start = live_bytes(); let msg_bundle = Box::new( - try_build_msg_bundle1( - MsgId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - build_some_point2d(NUM_POINTS), + DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells1( + MsgId::random(), + entity_path!("points"), + [build_frame_nr(0.into())], + NUM_POINTS as _, + build_some_point2d(NUM_POINTS), + )], ) - .unwrap(), + .into_msg_bundle(), ); let msg_bundle_bytes = live_bytes() - used_bytes_start; let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap())); diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index ad9c5b71823f3..e7592ef9a42de 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -4,9 +4,9 @@ use re_arrow_store::{DataStoreConfig, GarbageCollectionTarget, TimeInt}; use re_log_types::{ component_types::InstanceKey, external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, msg_bundle::MsgBundle, - ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, EntityPath, - EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint, - Timeline, + ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, + EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, + TimePoint, Timeline, }; use crate::{Error, TimesPerTimeline}; @@ -78,41 +78,52 @@ impl EntityDb { fn try_add_arrow_data_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> { let msg_bundle = MsgBundle::try_from(msg).map_err(Error::MsgBundleError)?; + let table = DataTable::from_msg_bundle(msg_bundle); - for (&timeline, &time_int) in msg_bundle.time_point.iter() { + // TODO(cmc): batching + for row in table.as_rows() { + // TODO: should that fail? + self.try_add_data_row(&row)?; + } + + Ok(()) + } + + fn try_add_data_row(&mut self, row: &DataRow) -> Result<(), Error> { + for (&timeline, &time_int) in row.timepoint().iter() { self.times_per_timeline.insert(timeline, time_int); } - self.register_entity_path(&msg_bundle.entity_path); + self.register_entity_path(&row.entity_path); - for cell in &msg_bundle.cells { - let component_path = - ComponentPath::new(msg_bundle.entity_path.clone(), cell.component()); + // TODO(cmc): batching + for cell in row.cells() { + let component_path = ComponentPath::new(row.entity_path().clone(), cell.component()); if cell.component() == MsgId::name() { continue; } - let pending_clears = self - .tree - .add_data_msg(&msg_bundle.time_point, &component_path); + let pending_clears = self.tree.add_data_msg(row.timepoint(), &component_path); for (msg_id, time_point) in pending_clears { // Create and insert an empty component into the arrow store // TODO(jleibs): Faster empty-array creation let cell = DataCell::from_arrow_empty(cell.component(), cell.datatype().clone()); - let msg_bundle = MsgBundle::new( + + let row = DataRow::from_cells1( msg_id, - msg_bundle.entity_path.clone(), + row.entity_path.clone(), time_point.clone(), - vec![cell], + cell.num_instances(), + cell, ); - self.data_store.insert_row(&msg_bundle).ok(); + self.data_store.insert_row(&row).ok(); // Also update the tree with the clear-event self.tree.add_data_msg(&time_point, &component_path); } } - self.data_store.insert_row(&msg_bundle).map_err(Into::into) + self.data_store.insert_row(row).map_err(Into::into) } fn add_path_op(&mut self, msg_id: MsgId, time_point: &TimePoint, path_op: &PathOp) { @@ -127,13 +138,14 @@ impl EntityDb { // TODO(jleibs): Faster empty-array creation let cell = DataCell::from_arrow_empty(component_path.component_name, data_type.clone()); - let msg_bundle = MsgBundle::new( + let row = DataRow::from_cells1( msg_id, component_path.entity_path.clone(), time_point.clone(), - vec![cell], + cell.num_instances(), + cell, ); - self.data_store.insert_row(&msg_bundle).ok(); + self.data_store.insert_row(&row).ok(); // Also update the tree with the clear-event self.tree.add_data_msg(time_point, &component_path); } diff --git a/crates/re_log_types/benches/msg_encode_benchmark.rs b/crates/re_log_types/benches/msg_encode_benchmark.rs index 01812f74ddc2b..849e82399d2ca 100644 --- a/crates/re_log_types/benches/msg_encode_benchmark.rs +++ b/crates/re_log_types/benches/msg_encode_benchmark.rs @@ -7,8 +7,8 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use re_log_types::{ datagen::{build_frame_nr, build_some_colors, build_some_point2d}, entity_path, - msg_bundle::{try_build_msg_bundle2, MsgBundle}, - ArrowMsg, Index, LogMsg, MsgId, + msg_bundle::MsgBundle, + ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, }; use criterion::{criterion_group, criterion_main, Criterion}; @@ -63,13 +63,17 @@ fn mono_points_arrow(c: &mut Criterion) { fn generate_message_bundles() -> Vec { (0..NUM_POINTS) .map(|i| { - try_build_msg_bundle2( + DataTable::from_rows( MsgId::ZERO, - entity_path!("points", Index::Sequence(i as _)), - [build_frame_nr(0.into())], - (build_some_point2d(1), build_some_colors(1)), + [DataRow::from_cells2( + MsgId::ZERO, + entity_path!("points", Index::Sequence(i as _)), + [build_frame_nr(0.into())], + 1, + (build_some_point2d(1), build_some_colors(1)), + )], ) - .unwrap() + .into_msg_bundle() }) .collect() } @@ -115,16 +119,22 @@ fn mono_points_arrow(c: &mut Criterion) { fn batch_points_arrow(c: &mut Criterion) { fn generate_message_bundles() -> Vec { - vec![try_build_msg_bundle2( - MsgId::ZERO, - entity_path!("points"), - [build_frame_nr(0.into())], - ( - build_some_point2d(NUM_POINTS), - build_some_colors(NUM_POINTS), - ), - ) - .unwrap()] + vec![ + DataTable::from_rows( + MsgId::ZERO, + [DataRow::from_cells2( + MsgId::ZERO, + entity_path!("points"), + [build_frame_nr(0.into())], + 1, + ( + build_some_point2d(NUM_POINTS), + build_some_colors(NUM_POINTS), + ), + )], + ) + .into_msg_bundle(), // + ] } { diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index 9ea0dbc00c234..90d5322531e66 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -117,7 +117,7 @@ mod tests { use super::{ArrowMsg, Chunk, MsgId, Schema}; use crate::{ datagen::{build_frame_nr, build_some_point2d, build_some_rects}, - msg_bundle::try_build_msg_bundle2, + DataRow, }; #[test] @@ -162,19 +162,23 @@ mod tests { #[test] fn test_roundtrip_payload() { - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::ZERO, "world/rects", [build_frame_nr(0.into())], + 1, (build_some_point2d(1), build_some_rects(1)), - ) - .unwrap(); + ); + + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); // TODO(#1619): test the full roundtrip: // cell -> row -> table_in -> msg_in -> msg_out -> table_out // => msg_in == msg_out // => table_in == table_out - let msg_in: ArrowMsg = bundle.try_into().unwrap(); + let msg_in: ArrowMsg = msg_bundle.try_into().unwrap(); let buf = rmp_serde::to_vec(&msg_in).unwrap(); let msg_out: ArrowMsg = rmp_serde::from_slice(&buf).unwrap(); assert_eq!(msg_in, msg_out); diff --git a/crates/re_log_types/src/component_types/tensor.rs b/crates/re_log_types/src/component_types/tensor.rs index 6e5c2c5e1eb13..8ccd83f2d5c7c 100644 --- a/crates/re_log_types/src/component_types/tensor.rs +++ b/crates/re_log_types/src/component_types/tensor.rs @@ -764,12 +764,12 @@ fn test_arrow() { #[test] fn test_concat_and_slice() { - use crate::msg_bundle::wrap_in_listarray; use arrow2::array::ListArray; use arrow2::compute::concatenate::concatenate; - use arrow2_convert::{deserialize::TryIntoCollection, serialize::TryIntoArrow}; - let tensor1 = vec![Tensor { + use crate::DataCell; + + let tensor1 = Tensor { tensor_id: TensorId::random(), shape: vec![TensorDimension { size: 4, @@ -778,9 +778,9 @@ fn test_concat_and_slice() { data: TensorData::JPEG(vec![1, 2, 3, 4].into()), meaning: TensorDataMeaning::Unknown, meter: Some(1000.0), - }]; + }; - let tensor2 = vec![Tensor { + let tensor2 = Tensor { tensor_id: TensorId::random(), shape: vec![TensorDimension { size: 4, @@ -789,12 +789,10 @@ fn test_concat_and_slice() { data: TensorData::JPEG(vec![5, 6, 7, 8].into()), meaning: TensorDataMeaning::Unknown, meter: None, - }]; + }; - let array1: Box = tensor1.iter().try_into_arrow().unwrap(); - let list1 = wrap_in_listarray(array1).boxed(); - let array2: Box = tensor2.iter().try_into_arrow().unwrap(); - let list2 = wrap_in_listarray(array2).boxed(); + let list1 = DataCell::from_native(&[tensor1.clone()]).as_arrow_monolist(); + let list2 = DataCell::from_native(&[tensor2.clone()]).as_arrow_monolist(); let pre_concat = list1 .as_any() @@ -802,8 +800,10 @@ fn test_concat_and_slice() { .unwrap() .value(0); - let tensor_out: Vec = TryIntoCollection::try_into_collection(pre_concat).unwrap(); - + let tensor_out = DataCell::from_arrow(Tensor::name(), pre_concat) + .as_native::() + .next() + .unwrap(); assert_eq!(tensor1, tensor_out); let concat = concatenate(&[list1.as_ref(), list2.as_ref()]).unwrap(); @@ -814,7 +814,10 @@ fn test_concat_and_slice() { .unwrap() .value(1); - let tensor_out: Vec = TryIntoCollection::try_into_collection(slice).unwrap(); + let tensor_out = DataCell::from_arrow(Tensor::name(), slice) + .as_native::() + .next() + .unwrap(); - assert_eq!(tensor2[0], tensor_out[0]); + assert_eq!(tensor2, tensor_out); } diff --git a/crates/re_log_types/src/data_cell.rs b/crates/re_log_types/src/data_cell.rs index 4886d611f266c..f749fc3494604 100644 --- a/crates/re_log_types/src/data_cell.rs +++ b/crates/re_log_types/src/data_cell.rs @@ -9,8 +9,12 @@ pub enum DataCellError { #[error("Unsupported datatype: {0:?}")] UnsupportedDatatype(arrow2::datatypes::DataType), - #[error("Could not serialize/deserialize component instances to/from Arrow")] + #[error("Could not serialize/deserialize data to/from Arrow")] Arrow(#[from] arrow2::error::Error), + + // Needed to handle TryFrom -> T + #[error("Infallible")] + Unreachable(#[from] std::convert::Infallible), } pub type DataCellResult = ::std::result::Result; @@ -33,11 +37,7 @@ pub type DataCellResult = ::std::result::Result; /// /// Consider this example: /// ```ignore -/// let points: &[Point2D] = &[ -/// [10.0, 10.0].into(), -/// [20.0, 20.0].into(), -/// [30.0, 30.0].into(), -/// ]; +/// let points: &[Point2D] = &[[10.0, 10.0].into(), [20.0, 20.0].into(), [30.0, 30.0].into()]; /// let cell = DataCell::from(points); /// // Or, alternatively: /// let cell = DataCell::from_component::([[10.0, 10.0], [20.0, 20.0], [30.0, 30.0]]); diff --git a/crates/re_log_types/src/data_row.rs b/crates/re_log_types/src/data_row.rs new file mode 100644 index 0000000000000..fe37cfdb9009a --- /dev/null +++ b/crates/re_log_types/src/data_row.rs @@ -0,0 +1,507 @@ +use ahash::HashSetExt; +use itertools::Itertools as _; +use nohash_hasher::IntSet; + +use crate::{ + Component, ComponentName, DataCell, DataCellError, DataTable, EntityPath, MsgId, TimePoint, +}; + +// TODO: any profile scopes needed? +// TODO: one of these days we need to check whether a single timepoint holds duplicated timelines +// though... + +// --- + +// TODO: test error paths + +#[derive(thiserror::Error, Debug)] +pub enum DataRowError { + #[error( + "Each cell must contain either 0, 1 or `num_instances` instances, \ + but cell '{component}' in '{entity_path}' holds {num_instances} instances \ + (expected {expected_num_instances}" + )] + WrongNumberOfInstances { + entity_path: EntityPath, + component: ComponentName, + expected_num_instances: u32, + num_instances: u32, + }, + + #[error( + "Same component type present multiple times within a single row: \ + '{component}' in '{entity_path}'" + )] + DupedComponent { + entity_path: EntityPath, + component: ComponentName, + }, + + #[error("Error with one or more the underlying data cells")] + DataCell(#[from] DataCellError), + + #[error("Could not serialize/deserialize data to/from Arrow")] + Arrow(#[from] arrow2::error::Error), + + // Needed to handle TryFrom -> T + #[error("Infallible")] + Unreachable(#[from] std::convert::Infallible), +} + +pub type DataRowResult = ::std::result::Result; + +// --- + +/// A row's worth of data, i.e. an event: a list of [`DataCell`]s associated with an auto-generated +/// [`EventId`], a user-specified [`TimePoint`] and [`EntityPath`], and an expected number of +/// instances. +/// +/// A `DataRow` can only be constructed from a collection of [`DataCell`]s. +/// +/// Behind the scenes, a `DataRow` is backed by a independent [`DataCell`]s which likely refer to +/// non-contiguous parts of the heap. +/// Cloning a `DataRow` is not too costly but needs to be avoided on the happy path. +/// +/// ## Field visibility +/// +/// To facilitate destructuring (`let DataRow { .. } = row`), all the fields in `DataRow` are +/// public. +/// +/// Reading or writing to any of these fields from outside this crate is considered undefined +/// behavior. +/// Use the appropriate getters and setters instead. +/// +/// ## Layout +/// +/// A row is a collection of cells where each cell must either be empty (a clear), unit-lengthed +/// (a splat) or [`num_instances`] long (standard): `[[C1, C1, C1], [], [C3], [C4, C4, C4], ...]`. +/// +/// Consider this example: +/// ```ignore +/// let num_instances = 2; +/// let points: &[Point2D] = &[[10.0, 10.0].into(), [20.0, 20.0].into()]; +/// let colors: &[_] = &[ColorRGBA::from_rgb(128, 128, 128)]; +/// let labels: &[Label] = &[]; +/// let row = DataRow::from_cells3(event_id, timepoint, ent_path, num_instances, (points, colors, labels)); +/// ``` +/// +/// A row has no arrow representation nor datatype of its own, as it is merely a collection of +/// independent cells. +/// +/// Visualized in the context of a larger table, it is simply a row of cells: +/// ```text +/// ┌──────────────────────────────────┬─────────────────┬─────────────┐ +/// │ rerun.point2d ┆ rerun.colorrgba ┆ rerun.label │ +/// ╞══════════════════════════════════╪═════════════════╪═════════════╡ +/// │ [{x: 10, y: 10}, {x: 20, y: 20}] ┆ [2155905279] ┆ [] │ +/// └──────────────────────────────────┴─────────────────┴─────────────┘ +/// ``` +/// +/// ## Example +/// +/// ```rust +/// # use re_log_types::{ +/// # component_types::{ColorRGBA, Label, MsgId, Point2D}, +/// # DataRow, Timeline, +/// # }; +/// # +/// # let row_id = MsgId::ZERO; +/// # let timepoint = [ +/// # (Timeline::new_sequence("frame_nr"), 42.into()), // +/// # (Timeline::new_sequence("pouet"), 666.into()), // +/// # ]; +/// # +/// let num_instances = 2; +/// let points: &[Point2D] = &[[10.0, 10.0].into(), [20.0, 20.0].into()]; +/// let colors: &[_] = &[ColorRGBA::from_rgb(128, 128, 128)]; +/// let labels: &[Label] = &[]; +/// +/// let row = DataRow::from_cells3( +/// row_id, +/// "a/b/c", +/// timepoint, +/// num_instances, +/// (points, colors, labels), +/// ); +/// eprintln!("{row}"); +/// ``` +#[derive(Debug, Clone)] +pub struct DataRow { + /// Auto-generated [`TUID`], uniquely identifying this event and keeping track of the client's + /// wall-clock. + // TODO(#1619): introduce EventId & BatchId + pub row_id: MsgId, + + /// User-specified [`TimePoint`] for this event. + pub timepoint: TimePoint, + + /// User-specified [`EntityPath`] for this event. + pub entity_path: EntityPath, + + /// The expected number of values (== component instances) in each cell. + /// + /// Each cell must have either: + /// - 0 instance (clear), + /// - 1 instance (splat), + /// - `num_instances` instances (standard). + pub num_instances: u32, + + /// The actual cells (== columns, == components). + pub cells: Vec, +} + +impl DataRow { + /// Builds a new `DataRow` from an iterable of [`DataCell`]s. + /// + /// Fails if: + /// - one or more cell isn't 0, 1 or `num_instances` long, + /// - two or more cells share the same component type. + pub fn try_from_cells( + row_id: MsgId, + timepoint: impl Into, + entity_path: impl Into, + num_instances: u32, + cells: impl IntoIterator, + ) -> DataRowResult { + let cells = cells.into_iter().collect_vec(); + + let entity_path = entity_path.into(); + let timepoint = timepoint.into(); + + let mut components = IntSet::with_capacity(cells.len()); + for cell in &cells { + let component = cell.component(); + + if !components.insert(component) { + return Err(DataRowError::DupedComponent { + entity_path, + component, + }); + } + + match cell.num_instances() { + 0 | 1 => {} + n if n == num_instances => {} + n => { + return Err(DataRowError::WrongNumberOfInstances { + entity_path, + component, + expected_num_instances: num_instances, + num_instances: n, + }) + } + } + } + + let mut this = Self { + row_id, + entity_path, + timepoint, + num_instances, + cells, + }; + + // TODO(cmc): Since we don't yet support mixing splatted data within instanced rows, + // we need to craft an array of `MsgId`s that matches the length of the other components. + // TODO(#1619): This goes away with batching & al + if !components.contains(&MsgId::name()) { + this.cells.push(DataCell::from_native( + vec![row_id; this.num_instances() as _].iter(), + )); + } + + Ok(this) + } + + /// Builds a new `DataRow` from an iterable of [`DataCell`]s. + /// + /// Panics if: + /// - one or more cell isn't 0, 1 or `num_instances` long, + /// - two or more cells share the same component type. + /// + /// See [`Self::try_from_cells`] for the fallible alternative. + pub fn from_cells( + row_id: MsgId, + timepoint: impl Into, + entity_path: impl Into, + num_instances: u32, + cells: impl IntoIterator, + ) -> Self { + Self::try_from_cells(row_id, timepoint, entity_path, num_instances, cells).unwrap() + } + + #[inline] + pub fn into_table(self, table_id: MsgId) -> DataTable { + DataTable::from_rows(table_id, [self]) + } +} + +impl DataRow { + #[inline] + pub fn row_id(&self) -> MsgId { + self.row_id + } + + #[inline] + pub fn timepoint(&self) -> &TimePoint { + &self.timepoint + } + + #[inline] + pub fn entity_path(&self) -> &EntityPath { + &self.entity_path + } + + #[inline] + pub fn num_components(&self) -> usize { + self.cells.len() + } + + #[inline] + pub fn components(&self) -> impl ExactSizeIterator + '_ { + self.cells.iter().map(|cell| cell.component()) + } + + #[inline] + pub fn num_instances(&self) -> u32 { + self.num_instances + } + + #[inline] + pub fn cells(&self) -> &[DataCell] { + &self.cells + } + + #[inline] + pub fn into_cells(self) -> Vec { + self.cells + } + + /// Returns the index of the cell with the given component type in the row, if it exists. + /// + /// This is `O(n)`. + #[inline] + pub fn find_cell(&self, component: &ComponentName) -> Option { + self.cells + .iter() + .map(|cell| cell.component()) + .position(|name| name == *component) + } +} + +// --- + +impl DataRow { + pub fn from_cells1( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: C0, + ) -> DataRow + where + C0: Into, + { + Self::from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [into_cells.into()], + ) + } + + pub fn try_from_cells1( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: C0, + ) -> DataRowResult + where + C0: TryInto, + DataRowError: From<>::Error>, + { + Self::try_from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [into_cells.try_into()?], + ) + } + + pub fn from_cells2( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: (C0, C1), + ) -> DataRow + where + C0: Into, + C1: Into, + { + Self::from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [ + into_cells.0.into(), // + into_cells.1.into(), // + ], + ) + } + + pub fn try_from_cells2( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: (C0, C1), + ) -> DataRowResult + where + C0: TryInto, + C1: TryInto, + DataRowError: From<>::Error>, + DataRowError: From<>::Error>, + { + Self::try_from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [ + into_cells.0.try_into()?, // + into_cells.1.try_into()?, // + ], + ) + } + + pub fn from_cells3( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: (C0, C1, C2), + ) -> DataRow + where + C0: Into, + C1: Into, + C2: Into, + { + Self::from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [ + into_cells.0.into(), // + into_cells.1.into(), // + into_cells.2.into(), // + ], + ) + } + + pub fn try_from_cells3( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: (C0, C1, C2), + ) -> DataRowResult + where + C0: TryInto, + C1: TryInto, + C2: TryInto, + DataRowError: From<>::Error>, + DataRowError: From<>::Error>, + DataRowError: From<>::Error>, + { + Self::try_from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [ + into_cells.0.try_into()?, // + into_cells.1.try_into()?, // + into_cells.2.try_into()?, // + ], + ) + } +} + +// --- + +impl std::fmt::Display for DataRow { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "Row #{} @ '{}'", self.row_id, self.entity_path)?; + for (timeline, time) in &self.timepoint { + writeln!(f, "- {}: {}", timeline.name(), timeline.typ().format(*time))?; + } + + re_format::arrow::format_table( + self.cells.iter().map(|cell| cell.as_arrow_monolist()), + self.cells.iter().map(|cell| cell.component()), + ) + .fmt(f) + } +} + +// --- + +#[cfg(test)] +mod tests { + use super::*; + + use crate::component_types::{ColorRGBA, Label, Point2D}; + + #[test] + fn data_row_error_num_instances() { + let row_id = MsgId::ZERO; + let timepoint = TimePoint::timeless(); + + let num_instances = 2; + let points: &[Point2D] = &[[10.0, 10.0].into(), [20.0, 20.0].into()]; + let colors: &[_] = &[ColorRGBA::from_rgb(128, 128, 128)]; + let labels: &[Label] = &[]; + + // 0 = clear: legal + DataRow::try_from_cells1(row_id, "a/b/c", timepoint.clone(), num_instances, labels) + .unwrap(); + + // 1 = splat: legal + DataRow::try_from_cells1(row_id, "a/b/c", timepoint.clone(), num_instances, colors) + .unwrap(); + + // num_instances = standard: legal + DataRow::try_from_cells1(row_id, "a/b/c", timepoint.clone(), num_instances, points) + .unwrap(); + + // anything else is illegal + let points: &[Point2D] = &[ + [10.0, 10.0].into(), + [20.0, 20.0].into(), + [30.0, 30.0].into(), + ]; + let err = DataRow::try_from_cells1(row_id, "a/b/c", timepoint, num_instances, points) + .unwrap_err(); + + match err { + DataRowError::WrongNumberOfInstances { + entity_path, + component, + expected_num_instances, + num_instances, + } => { + assert_eq!(EntityPath::from("a/b/c"), entity_path); + assert_eq!(Point2D::name(), component); + assert_eq!(2, expected_num_instances); + assert_eq!(3, num_instances); + } + _ => unreachable!(), + } + } +} diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs new file mode 100644 index 0000000000000..a47c94c42a872 --- /dev/null +++ b/crates/re_log_types/src/data_table.rs @@ -0,0 +1,275 @@ +use itertools::Itertools as _; +use nohash_hasher::{IntMap, IntSet}; + +use crate::{ComponentName, DataCell, DataRow, EntityPath, MsgId, TimePoint}; + +// TODO: any profile scopes needed? +// TODO: let's turn this into a msgbundle, and then another PR handles table and removes msgbundle + +// --- + +#[derive(thiserror::Error, Debug)] +pub enum DataTableError { + #[error("Could not serialize/deserialize component instances to/from Arrow")] + Arrow(#[from] arrow2::error::Error), +} + +pub type DataTableResult = ::std::result::Result; + +// --- + +// TODO: explain that while not contiguous... it in fact is + +// TODO +#[derive(Debug, Clone)] +pub struct DataTable { + /// Auto-generated [`TUID`], uniquely identifying this batch of data and keeping track of the + /// client's wall-clock. + // TODO(cmc): introduce EventId & BatchId + // TODO(cmc): use once batching lands + pub table_id: MsgId, + + /// The entire column of [`EventId`]s. + // TODO(cmc): introduce EventId & BatchId + pub row_id: Vec, + + /// The entire column of [`TimePoint`]s. + pub timepoint: Vec, + + /// The entire column of [`EntityPath`]s. + pub entity_path: Vec, + + /// The entire column of `num_instances`. + pub num_instances: Vec, + + /// All the rows for all the component columns. + /// + /// The cells are optional since not all rows will have data for every single component + /// (i.e. the table is sparse). + pub table: IntMap>>, +} + +impl DataTable { + /// Builds a new `DataTable` from an iterable of [`DataRow`]s. + // + // TODO: fails if? + // TODO: panicky version + pub fn from_rows(table_id: MsgId, rows: impl IntoIterator) -> Self { + let rows = rows.into_iter(); + + // Explode all rows into columns, and keep track of which components are involved. + let mut components = IntSet::default(); + let (row_id, timepoint, entity_path, num_instances, rows): ( + Vec<_>, + Vec<_>, + Vec<_>, + Vec<_>, + Vec<_>, + ) = rows + .map(|row| { + components.extend(row.components()); + let DataRow { + row_id, + timepoint, + entity_path, + num_instances, + cells, + } = row; + (row_id, timepoint, entity_path, num_instances, cells) + }) + .multiunzip(); + + // Pre-allocate all columns (one per component). + let mut table = IntMap::default(); + for component in components { + table.insert(component, vec![None; rows.len()]); + } + + // Fill all columns (where possible: data is likely sparse). + for (i, row) in rows.into_iter().enumerate() { + for cell in row { + let component = cell.component(); + // NOTE: unwrap cannot fail, all arrays pre-allocated above + table.get_mut(&component).unwrap()[i] = Some(cell); + } + } + + Self { + table_id, + row_id, + timepoint, + entity_path, + num_instances, + table, + } + } +} + +impl DataTable { + #[inline] + pub fn num_rows(&self) -> u32 { + self.row_id.len() as _ + } +} + +// --- + +// TODO: temporary transition period stuff + +use crate::msg_bundle::MsgBundle; + +impl DataTable { + pub fn as_rows(&self) -> impl ExactSizeIterator + '_ { + let num_rows = self.num_rows() as usize; + + let Self { + table_id: _, + row_id, + timepoint, + entity_path, + num_instances, + table, + } = self; + + (0..num_rows).map(move |i| { + let cells = table + .values() + .filter_map(|rows| rows[i].clone() /* shallow */); + + DataRow::from_cells( + row_id[i], + // TODO: get rid of those + timepoint[i].clone(), + entity_path[i].clone(), + num_instances[i], + cells, + ) + }) + } + + pub fn from_msg_bundle(msg_bundle: MsgBundle) -> Self { + let MsgBundle { + msg_id, + entity_path, + time_point, + cells, + } = msg_bundle; + + Self::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells( + msg_id, + time_point, + entity_path, + cells.first().map_or(0, |cell| cell.num_instances()), + cells, + )], + ) + } + + pub fn into_msg_bundle(self) -> MsgBundle { + let mut rows = self.as_rows(); + assert!(rows.len() == 1, "must have 1 row, got {}", rows.len()); // TODO(cmc): batching + let row = rows.next().unwrap(); + + let DataRow { + row_id, + timepoint, + entity_path, + num_instances: _, + cells, + } = row; + + let table_id = row_id; // ! + + MsgBundle::new(table_id, entity_path, timepoint, cells) + } +} + +// --- + +impl std::fmt::Display for DataTable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for row in self.as_rows() { + writeln!(f, "{row}")?; + } + Ok(()) + } +} + +// --- + +// TODO: +// - append +// - error paths + +#[test] +fn data_table() { + use crate::{ + component_types::{ColorRGBA, Label, Point2D}, + Timeline, + }; + + let table_id = MsgId::random(); + + let timepoint = |frame_nr: i64, pouet: i64| { + TimePoint::from([ + ( + Timeline::new("frame_nr", crate::TimeType::Sequence), + frame_nr.into(), + ), // + ( + Timeline::new("pouet", crate::TimeType::Sequence), + pouet.into(), + ), // + ]) + }; + + let row1 = { + let num_instances = 2; + let points: &[_] = &[ + Point2D::new(10.0, 10.0), // + Point2D::new(20.0, 20.0), + ]; + let colors: &[_] = &[ + ColorRGBA::from_rgb(128, 128, 128), // + ]; + let labels: &[Label] = &[]; + + DataRow::from_cells3( + MsgId::random(), + "a", + timepoint(1, 1), + num_instances, + (points, colors, labels), + ) + }; + + let row2 = { + let num_instances = 0; + let colors: &[ColorRGBA] = &[]; + + DataRow::from_cells1(MsgId::random(), "b", timepoint(1, 2), num_instances, colors) + }; + + let row3 = { + let num_instances = 1; + let colors: &[_] = &[ + ColorRGBA::from_rgb(128, 128, 128), // + ]; + let labels: &[_] = &[Label("hey".into())]; + + DataRow::from_cells2( + MsgId::random(), + "c", + timepoint(2, 1), + num_instances, + (colors, labels), + ) + }; + + let table = DataTable::from_rows(table_id, [row1, row2, row3]); + eprintln!("{table}"); + + // TODO: checks +} diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index d0b20f2391973..0b6f1f83e3b67 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -15,6 +15,8 @@ mod component; pub mod component_types; mod data; mod data_cell; +mod data_row; +mod data_table; pub mod hash; mod index; pub mod msg_bundle; @@ -46,6 +48,8 @@ pub use self::component_types::ViewCoordinates; pub use self::component_types::{EncodedMesh3D, Mesh3D, MeshFormat, MeshId, RawMesh3D}; pub use self::data::*; pub use self::data_cell::{DataCell, DataCellError, DataCellResult}; +pub use self::data_row::{DataRow, DataRowError, DataRowResult}; +pub use self::data_table::{DataTable, DataTableError, DataTableResult}; pub use self::index::*; pub use self::path::*; pub use self::time::{Duration, Time}; diff --git a/crates/re_log_types/src/msg_bundle.rs b/crates/re_log_types/src/msg_bundle.rs index 4640ea59b17f2..660b207cec4ea 100644 --- a/crates/re_log_types/src/msg_bundle.rs +++ b/crates/re_log_types/src/msg_bundle.rs @@ -24,7 +24,6 @@ use arrow2::{ array::{Array, ListArray, StructArray}, chunk::Chunk, datatypes::{DataType, Field, Schema}, - offset::Offsets, }; use arrow2_convert::{field::ArrowField, serialize::TryIntoArrow}; @@ -35,6 +34,7 @@ use crate::{ // --- +// TODO: can probably make that one pub(crate) already /// The errors that can occur when trying to convert between Arrow and `MessageBundle` types #[derive(thiserror::Error, Debug)] pub enum MsgBundleError { @@ -62,7 +62,7 @@ pub enum MsgBundleError { #[error("Could not serialize components to Arrow")] ArrowSerializationError(#[from] arrow2::error::Error), - #[error(transparent)] + #[error("Error with one or more the underlying data cells")] DataCell(#[from] DataCellError), // Needed to handle TryFrom -> T @@ -97,26 +97,18 @@ impl MsgBundle { /// /// The `MsgId` will automatically be appended as a component to the given `bundles`, allowing /// the backend to keep track of the origin of any row of data. - pub fn new( + pub(crate) fn new( msg_id: MsgId, entity_path: EntityPath, time_point: TimePoint, - components: Vec, + cells: Vec, ) -> Self { - let mut this = Self { + Self { msg_id, entity_path, time_point, - cells: components, - }; - - // TODO(cmc): Since we don't yet support mixing splatted data within instanced rows, - // we need to craft an array of `MsgId`s that matches the length of the other components. - this.cells.push(DataCell::from_native( - vec![msg_id; this.num_instances()].iter(), - )); - - this + cells, + } } /// Returns the number of component collections in this bundle, i.e. the length of the bundle @@ -316,89 +308,3 @@ fn extract_components(schema: &Schema, msg: &Chunk>) -> Result) -> ListArray { - let datatype = ListArray::::default_datatype(field_array.data_type().clone()); - let offsets = Offsets::try_from_lengths(std::iter::once(field_array.len())) - .unwrap() - .into(); - let values = field_array; - let validity = None; - ListArray::::new(datatype, offsets, values, validity) -} - -/// Helper to build a `MessageBundle` from 1 component -pub fn try_build_msg_bundle1( - msg_id: MsgId, - into_entity_path: O, - into_time_point: T, - into_cells: C0, -) -> Result -where - O: Into, - T: Into, - C0: TryInto, - MsgBundleError: From<>::Error>, -{ - Ok(MsgBundle::new( - msg_id, - into_entity_path.into(), - into_time_point.into(), - vec![into_cells.try_into()?], - )) -} - -/// Helper to build a `MessageBundle` from 2 components -pub fn try_build_msg_bundle2( - msg_id: MsgId, - into_entity_path: O, - into_time_point: T, - into_cells: (C0, C1), -) -> Result -where - O: Into, - T: Into, - C0: TryInto, - C1: TryInto, - MsgBundleError: From<>::Error>, - MsgBundleError: From<>::Error>, -{ - Ok(MsgBundle::new( - msg_id, - into_entity_path.into(), - into_time_point.into(), - vec![into_cells.0.try_into()?, into_cells.1.try_into()?], - )) -} - -/// Helper to build a `MessageBundle` from 3 components -pub fn try_build_msg_bundle3( - msg_id: MsgId, - into_entity_path: O, - into_time_point: T, - into_cells: (C0, C1, C2), -) -> Result -where - O: Into, - T: Into, - C0: TryInto, - C1: TryInto, - C2: TryInto, - MsgBundleError: From<>::Error>, - MsgBundleError: From<>::Error>, - MsgBundleError: From<>::Error>, -{ - Ok(MsgBundle::new( - msg_id, - into_entity_path.into(), - into_time_point.into(), - vec![ - into_cells.0.try_into()?, - into_cells.1.try_into()?, - into_cells.2.try_into()?, - ], - )) -} diff --git a/crates/re_query/benches/query_benchmark.rs b/crates/re_query/benches/query_benchmark.rs index 3d302faa3faaf..d65c2947cbeba 100644 --- a/crates/re_query/benches/query_benchmark.rs +++ b/crates/re_query/benches/query_benchmark.rs @@ -8,9 +8,7 @@ use re_arrow_store::{DataStore, LatestAtQuery}; use re_log_types::{ component_types::{ColorRGBA, InstanceKey, Point2D, Vec3D}, datagen::{build_frame_nr, build_some_colors, build_some_point2d, build_some_vec3d}, - entity_path, - msg_bundle::{try_build_msg_bundle1, try_build_msg_bundle2, MsgBundle}, - Component, EntityPath, Index, MsgId, TimeType, Timeline, + entity_path, Component, DataRow, EntityPath, Index, MsgId, TimeType, Timeline, }; use re_query::query_entity_with_primary; @@ -45,7 +43,7 @@ fn mono_points(c: &mut Criterion) { let paths = (0..NUM_POINTS) .map(move |point_idx| entity_path!("points", Index::Sequence(point_idx as _))) .collect_vec(); - let msgs = build_points_messages(&paths, 1); + let msgs = build_points_rows(&paths, 1); { let mut group = c.benchmark_group("arrow_mono_points"); @@ -55,14 +53,14 @@ fn mono_points(c: &mut Criterion) { (NUM_POINTS * NUM_FRAMES_POINTS) as _, )); group.bench_function("insert", |b| { - b.iter(|| insert_messages(msgs.iter())); + b.iter(|| insert_rows(msgs.iter())); }); } { let mut group = c.benchmark_group("arrow_mono_points"); group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); - let mut store = insert_messages(msgs.iter()); + let mut store = insert_rows(msgs.iter()); group.bench_function("query", |b| { b.iter(|| query_and_visit_points(&mut store, &paths)); }); @@ -72,7 +70,7 @@ fn mono_points(c: &mut Criterion) { fn batch_points(c: &mut Criterion) { // Batch points are logged together at a single path let paths = [EntityPath::from("points")]; - let msgs = build_points_messages(&paths, NUM_POINTS as _); + let msgs = build_points_rows(&paths, NUM_POINTS as _); { let mut group = c.benchmark_group("arrow_batch_points"); @@ -80,14 +78,14 @@ fn batch_points(c: &mut Criterion) { (NUM_POINTS * NUM_FRAMES_POINTS) as _, )); group.bench_function("insert", |b| { - b.iter(|| insert_messages(msgs.iter())); + b.iter(|| insert_rows(msgs.iter())); }); } { let mut group = c.benchmark_group("arrow_batch_points"); group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); - let mut store = insert_messages(msgs.iter()); + let mut store = insert_rows(msgs.iter()); group.bench_function("query", |b| { b.iter(|| query_and_visit_points(&mut store, &paths)); }); @@ -97,7 +95,7 @@ fn batch_points(c: &mut Criterion) { fn batch_vecs(c: &mut Criterion) { // Batch points are logged together at a single path let paths = [EntityPath::from("vec")]; - let msgs = build_vecs_messages(&paths, NUM_VECS as _); + let msgs = build_vecs_rows(&paths, NUM_VECS as _); { let mut group = c.benchmark_group("arrow_batch_vecs"); @@ -105,14 +103,14 @@ fn batch_vecs(c: &mut Criterion) { (NUM_VECS * NUM_FRAMES_VECS) as _, )); group.bench_function("insert", |b| { - b.iter(|| insert_messages(msgs.iter())); + b.iter(|| insert_rows(msgs.iter())); }); } { let mut group = c.benchmark_group("arrow_batch_vecs"); group.throughput(criterion::Throughput::Elements(NUM_VECS as _)); - let mut store = insert_messages(msgs.iter()); + let mut store = insert_rows(msgs.iter()); group.bench_function("query", |b| { b.iter(|| query_and_visit_vecs(&mut store, &paths)); }); @@ -121,41 +119,41 @@ fn batch_vecs(c: &mut Criterion) { // --- Helpers --- -fn build_points_messages(paths: &[EntityPath], pts: usize) -> Vec { +fn build_points_rows(paths: &[EntityPath], pts: usize) -> Vec { (0..NUM_FRAMES_POINTS) .flat_map(move |frame_idx| { paths.iter().map(move |path| { - try_build_msg_bundle2( + DataRow::from_cells2( MsgId::ZERO, path.clone(), [build_frame_nr((frame_idx as i64).into())], + pts as _, (build_some_point2d(pts), build_some_colors(pts)), ) - .unwrap() }) }) .collect() } -fn build_vecs_messages(paths: &[EntityPath], pts: usize) -> Vec { +fn build_vecs_rows(paths: &[EntityPath], pts: usize) -> Vec { (0..NUM_FRAMES_VECS) .flat_map(move |frame_idx| { paths.iter().map(move |path| { - try_build_msg_bundle1( + DataRow::from_cells1( MsgId::ZERO, path.clone(), [build_frame_nr((frame_idx as i64).into())], + pts as _, build_some_vec3d(pts), ) - .unwrap() }) }) .collect() } -fn insert_messages<'a>(msgs: impl Iterator) -> DataStore { +fn insert_rows<'a>(msgs: impl Iterator) -> DataStore { let mut store = DataStore::new(InstanceKey::name(), Default::default()); - msgs.for_each(|msg_bundle| store.insert_row(msg_bundle).unwrap()); + msgs.for_each(|row| store.insert_row(row).unwrap()); store } diff --git a/crates/re_query/examples/range.rs b/crates/re_query/examples/range.rs index 576813cec255d..47251d85c75c8 100644 --- a/crates/re_query/examples/range.rs +++ b/crates/re_query/examples/range.rs @@ -8,8 +8,7 @@ use re_arrow_store::{DataStore, RangeQuery, TimeRange}; use re_log_types::{ component_types::{InstanceKey, Point2D, Rect2D}, datagen::{build_frame_nr, build_some_point2d, build_some_rects}, - msg_bundle::try_build_msg_bundle1, - Component as _, EntityPath, MsgId, TimeType, + Component as _, DataRow, EntityPath, MsgId, TimeType, }; use re_query::range_entity_with_primary; @@ -24,28 +23,28 @@ fn main() { let frame4 = [build_frame_nr(4.into())]; let rects = build_some_rects(2); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame1, &rects).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame1, 2, &rects); + store.insert_row(&row).unwrap(); let points = build_some_point2d(2); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame2, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame2, 2, &points); + store.insert_row(&row).unwrap(); let points = build_some_point2d(4); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame3, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame3, 4, &points); + store.insert_row(&row).unwrap(); let rects = build_some_rects(3); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &rects).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame4, 3, &rects); + store.insert_row(&row).unwrap(); let points = build_some_point2d(3); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame4, 3, &points); + store.insert_row(&row).unwrap(); let rects = build_some_rects(3); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &rects).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame4, 3, &rects); + store.insert_row(&row).unwrap(); let query = RangeQuery::new(frame2[0].0, TimeRange::new(frame2[0].1, frame4[0].1)); diff --git a/crates/re_query/src/query.rs b/crates/re_query/src/query.rs index 8731e8d202645..f52130654907a 100644 --- a/crates/re_query/src/query.rs +++ b/crates/re_query/src/query.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use re_arrow_store::{DataStore, LatestAtQuery}; -use re_log_types::{component_types::InstanceKey, Component, ComponentName, EntityPath}; +use re_log_types::{component_types::InstanceKey, Component, ComponentName, DataRow, EntityPath}; use crate::{ComponentWithInstances, EntityView, QueryError}; @@ -148,7 +148,6 @@ pub fn __populate_example_store() -> DataStore { use re_log_types::{ component_types::{ColorRGBA, Point2D}, datagen::build_frame_nr, - msg_bundle::try_build_msg_bundle2, MsgId, }; @@ -160,15 +159,26 @@ pub fn __populate_example_store() -> DataStore { let instances = vec![InstanceKey(42), InstanceKey(96)]; let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = - try_build_msg_bundle2(MsgId::ZERO, ent_path, timepoint, (&instances, &points)).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells2( + MsgId::ZERO, + ent_path, + timepoint, + instances.len() as _, + (&instances, &points), + ); + store.insert_row(&row).unwrap(); let instances = vec![InstanceKey(96)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = - try_build_msg_bundle2(MsgId::ZERO, ent_path, timepoint, (instances, colors)).unwrap(); - store.insert_row(&bundle).unwrap(); + + let row = DataRow::from_cells2( + MsgId::ZERO, + ent_path, + timepoint, + instances.len() as _, + (instances, colors), + ); + store.insert_row(&row).unwrap(); store } diff --git a/crates/re_query/tests/query_tests.rs b/crates/re_query/tests/query_tests.rs index 45032d9ea2d38..a251941bf54ac 100644 --- a/crates/re_query/tests/query_tests.rs +++ b/crates/re_query/tests/query_tests.rs @@ -5,9 +5,7 @@ use re_log_types::{ component_types::InstanceKey, component_types::{ColorRGBA, Point2D}, datagen::build_frame_nr, - msg_bundle::try_build_msg_bundle1, - msg_bundle::try_build_msg_bundle2, - Component, MsgId, + Component, DataRow, MsgId, }; use re_query::query_entity_with_primary; @@ -20,20 +18,20 @@ fn simple_query() { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, &points); + store.insert_row(&row).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path, timepoint, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert_row(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -91,15 +89,14 @@ fn timeless_query() { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, &points); + store.insert_row(&row).unwrap(); // Assign one of them a color with an explicit instance.. timelessly! let color_instances = vec![InstanceKey(1)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = - try_build_msg_bundle2(MsgId::random(), ent_path, [], (color_instances, colors)).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells2(MsgId::random(), ent_path, [], 1, (color_instances, colors)); + store.insert_row(&row).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -157,13 +154,13 @@ fn no_instance_join_query() { // Create some points with an implicit instance let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, &points); + store.insert_row(&row).unwrap(); // Assign them colors with explicit instances let colors = vec![ColorRGBA(0xff000000), ColorRGBA(0x00ff0000)]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &colors).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, &colors); + store.insert_row(&row).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -221,8 +218,8 @@ fn missing_column_join_query() { // Create some points with an implicit instance let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, points); + store.insert_row(&row).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -279,20 +276,20 @@ fn splatted_query() { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, points); + store.insert_row(&row).unwrap(); // Assign all of them a color via splat let color_instances = vec![InstanceKey::SPLAT]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path, timepoint, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert_row(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); diff --git a/crates/re_query/tests/range_tests.rs b/crates/re_query/tests/range_tests.rs index e75bb22c67a51..54f2cf62a3d7e 100644 --- a/crates/re_query/tests/range_tests.rs +++ b/crates/re_query/tests/range_tests.rs @@ -5,9 +5,7 @@ use re_log_types::{ component_types::InstanceKey, component_types::{ColorRGBA, Point2D}, datagen::build_frame_nr, - msg_bundle::try_build_msg_bundle1, - msg_bundle::try_build_msg_bundle2, - Component, EntityPath, MsgId, + Component, DataRow, EntityPath, MsgId, }; use re_query::range_entity_with_primary; @@ -21,21 +19,20 @@ fn simple_range() { { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint1, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint1, 2, &points); + store.insert_row(&row).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint1, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert_row(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint2 = [build_frame_nr(223.into())]; @@ -43,23 +40,22 @@ fn simple_range() { // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(0)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint2, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert_row(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint3 = [build_frame_nr(323.into())]; { // Create some points with implicit instances let points = vec![Point2D { x: 10.0, y: 20.0 }, Point2D { x: 30.0, y: 40.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint3, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint3, 2, &points); + store.insert_row(&row).unwrap(); } // --- First test: `(timepoint1, timepoint3]` --- @@ -241,35 +237,34 @@ fn timeless_range() { { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint1, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint1, 2, &points); + store.insert_row(&row).unwrap(); // Insert timelessly too! - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), [], &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), [], 2, &points); + store.insert_row(&row).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint1, + 1, (color_instances.clone(), colors.clone()), - ) - .unwrap(); - store.insert_row(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); // Insert timelessly too! - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), [], + 1, (color_instances, colors), - ) - .unwrap(); - store.insert_row(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint2 = [build_frame_nr(223.into())]; @@ -277,37 +272,36 @@ fn timeless_range() { // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(0)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint2, + 1, (color_instances.clone(), colors.clone()), - ) - .unwrap(); - store.insert_row(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); // Insert timelessly too! - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint2, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert_row(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint3 = [build_frame_nr(323.into())]; { // Create some points with implicit instances let points = vec![Point2D { x: 10.0, y: 20.0 }, Point2D { x: 30.0, y: 40.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint3, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint3, 2, &points); + store.insert_row(&row).unwrap(); // Insert timelessly too! - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), [], &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), [], 2, &points); + store.insert_row(&row).unwrap(); } // ┌───────────┬──────────┬────────┬─────────────────┬────────────────────┬──────────────────────┬────────────────────────────┐ @@ -676,21 +670,20 @@ fn simple_splatted_range() { { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint1, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint1, 2, points); + store.insert_row(&row).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint1, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert_row(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint2 = [build_frame_nr(223.into())]; @@ -698,23 +691,22 @@ fn simple_splatted_range() { // Assign one of them a color with a splatted instance let color_instances = vec![InstanceKey::SPLAT]; let colors = vec![ColorRGBA(0x00ff0000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint2, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert_row(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint3 = [build_frame_nr(323.into())]; { // Create some points with implicit instances let points = vec![Point2D { x: 10.0, y: 20.0 }, Point2D { x: 30.0, y: 40.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint3, &points).unwrap(); - store.insert_row(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint3, 2, points); + store.insert_row(&row).unwrap(); } // --- First test: `(timepoint1, timepoint3]` --- diff --git a/crates/re_sdk/src/msg_sender.rs b/crates/re_sdk/src/msg_sender.rs index 12b3d2ff12a0a..cc15b225edd2b 100644 --- a/crates/re_sdk/src/msg_sender.rs +++ b/crates/re_sdk/src/msg_sender.rs @@ -1,4 +1,4 @@ -use re_log_types::{component_types::InstanceKey, msg_bundle::MsgBundleError}; +use re_log_types::{component_types::InstanceKey, msg_bundle::MsgBundleError, DataRow, DataTable}; use nohash_hasher::IntMap; @@ -289,7 +289,7 @@ impl MsgSender { entity_path, timepoint, timeless, - num_instances: _, + num_instances, instanced, mut splatted, } = self; @@ -349,30 +349,53 @@ impl MsgSender { let mut msgs = [(); 3].map(|_| None); + // TODO: still shouldnt have any into_msg_bundle here + // Standard msgs[0] = (!standard_cells.is_empty()).then(|| { - MsgBundle::new( - MsgId::random(), - entity_path.clone(), - timepoint.clone(), - standard_cells, + DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells( + MsgId::random(), + timepoint.clone(), + entity_path.clone(), + num_instances.unwrap_or(0), + standard_cells, + )], ) + .into_msg_bundle() }); // Transforms msgs[1] = (!transform_cells.is_empty()).then(|| { - MsgBundle::new( - MsgId::random(), - entity_path.clone(), - timepoint.clone(), - transform_cells, + DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells( + MsgId::random(), + timepoint.clone(), + entity_path.clone(), + num_transform_instances, + transform_cells, + )], ) + .into_msg_bundle() }); // Splats + // TODO(cmc): unsplit splats once new data cells are in msgs[2] = (!splatted.is_empty()).then(|| { splatted.push(DataCell::from_native(&[InstanceKey::SPLAT])); - MsgBundle::new(MsgId::random(), entity_path, timepoint, splatted) + DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells( + MsgId::random(), + timepoint, + entity_path, + 1, + splatted, + )], + ) + .into_msg_bundle() }); Ok(msgs) diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index 433ab9b91c3e2..35ac4e5554b90 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -1,6 +1,7 @@ //! Methods for handling Arrow datamodel log ingest use arrow2::{array::Array, datatypes::Field, ffi}; +use itertools::Itertools as _; use pyo3::{ exceptions::{PyAttributeError, PyValueError}, ffi::Py_uintptr_t, @@ -9,9 +10,8 @@ use pyo3::{ PyAny, PyResult, }; use re_log_types::{ - component_types, - msg_bundle::{MsgBundle, MsgBundleError}, - DataCell, EntityPath, LogMsg, MsgId, TimePoint, + component_types, msg_bundle::MsgBundleError, DataCell, DataRow, EntityPath, LogMsg, MsgId, + TimePoint, }; /// Perform conversion between a pyarrow array to arrow2 types. @@ -102,15 +102,21 @@ pub fn build_chunk_from_components( .into_iter() .zip(fields.into_iter()) .map(|(value, field)| DataCell::from_arrow(field.name.into(), value)) - .collect(); + .collect_vec(); - let msg_bundle = MsgBundle::new( + let num_instances = cells.first().map_or(0, |cell| cell.num_instances()); + let row = DataRow::from_cells( MsgId::random(), - entity_path.clone(), time_point.clone(), + entity_path.clone(), + num_instances, cells, ); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle .try_into() .map_err(|e: MsgBundleError| PyValueError::new_err(e.to_string()))?; diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 20c92e8d82fea..3207d2a4a3d33 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -11,6 +11,7 @@ use pyo3::{ types::PyDict, }; +use re_log_types::DataRow; use rerun::{ log::{LogMsg, MsgBundle, MsgId, PathOp}, time::{Time, TimeInt, TimePoint, TimeType, Timeline}, @@ -463,14 +464,18 @@ fn log_transform( // python side will take a bit of additional work and testing to ensure we aren't // introducing new numerical issues. - let bundle = MsgBundle::new( + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![vec![transform].try_into().unwrap()], + 1, + [transform].as_slice(), ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg)); @@ -543,14 +548,19 @@ fn log_view_coordinates( // non-trivial. Implementing this functionality on the python side will take // a bit of additional work and testing to ensure we aren't introducing new // conversion errors. - let bundle = MsgBundle::new( + + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![vec![coordinates].try_into().unwrap()], + 1, + [coordinates].as_slice(), ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg)); @@ -674,14 +684,18 @@ fn log_meshes( // // TODO(jleibs) replace with python-native implementation - let bundle = MsgBundle::new( + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![meshes.try_into().unwrap()], + meshes.len() as _, + meshes, ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg)); @@ -752,14 +766,18 @@ fn log_mesh_file( // // TODO(jleibs) replace with python-native implementation - let bundle = MsgBundle::new( + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![vec![mesh3d].try_into().unwrap()], + 1, + [mesh3d].as_slice(), ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg)); @@ -829,26 +847,30 @@ fn log_image_file( let time_point = time(timeless); - let bundle = MsgBundle::new( + let tensor = re_log_types::component_types::Tensor { + tensor_id: TensorId::random(), + shape: vec![ + TensorDimension::height(h as _), + TensorDimension::width(w as _), + TensorDimension::depth(3), + ], + data: re_log_types::component_types::TensorData::JPEG(img_bytes.into()), + meaning: re_log_types::component_types::TensorDataMeaning::Unknown, + meter: None, + }; + + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![vec![re_log_types::component_types::Tensor { - tensor_id: TensorId::random(), - shape: vec![ - TensorDimension::height(h as _), - TensorDimension::width(w as _), - TensorDimension::depth(3), - ], - data: re_log_types::component_types::TensorData::JPEG(img_bytes.into()), - meaning: re_log_types::component_types::TensorDataMeaning::Unknown, - meter: None, - }] - .try_into() - .unwrap()], + 1, + [tensor].as_slice(), ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg)); @@ -916,14 +938,19 @@ fn log_annotation_context( // implementation. // // TODO(jleibs) replace with python-native implementation - let bundle = MsgBundle::new( + + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![vec![annotation_context.clone()].try_into().unwrap()], + 1, + [annotation_context].as_slice(), ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg));