diff --git a/crates/re_arrow_store/benches/data_store.rs b/crates/re_arrow_store/benches/data_store.rs index 1ae7c0f02ef06..cd055537b0a91 100644 --- a/crates/re_arrow_store/benches/data_store.rs +++ b/crates/re_arrow_store/benches/data_store.rs @@ -8,8 +8,7 @@ use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, Time use re_log_types::{ component_types::{InstanceKey, Rect2D}, datagen::{build_frame_nr, build_some_instances, build_some_rects}, - msg_bundle::{try_build_msg_bundle2, MsgBundle}, - Component as _, ComponentName, EntityPath, MsgId, TimeType, Timeline, + Component as _, ComponentName, DataRow, EntityPath, MsgId, TimeType, Timeline, }; // --- @@ -27,28 +26,30 @@ const NUM_RECTS: i64 = 1; // --- Benchmarks --- +// TODO(cmc): need additional benches for full tables + fn insert(c: &mut Criterion) { { - let msgs = build_messages(NUM_RECTS as usize); + let rows = build_rows(NUM_RECTS as usize); let mut group = c.benchmark_group("datastore/insert/batch/rects"); group.throughput(criterion::Throughput::Elements( (NUM_RECTS * NUM_FRAMES) as _, )); group.bench_function("insert", |b| { - b.iter(|| insert_messages(Default::default(), InstanceKey::name(), msgs.iter())); + b.iter(|| insert_rows(Default::default(), InstanceKey::name(), rows.iter())); }); } } fn latest_at_batch(c: &mut Criterion) { { - let msgs = build_messages(NUM_RECTS as usize); - let store = insert_messages(Default::default(), InstanceKey::name(), msgs.iter()); + let rows = build_rows(NUM_RECTS as usize); + let store = insert_rows(Default::default(), InstanceKey::name(), rows.iter()); let mut group = c.benchmark_group("datastore/latest_at/batch/rects"); group.throughput(criterion::Throughput::Elements(NUM_RECTS as _)); group.bench_function("query", |b| { b.iter(|| { - let results = latest_messages_at(&store, Rect2D::name(), &[Rect2D::name()]); + let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]); let rects = results[0] .as_ref() .unwrap() @@ -70,27 +71,27 @@ fn latest_at_missing_components(c: &mut Criterion) { }; { - let msgs = build_messages(NUM_RECTS as usize); - let store = insert_messages(config.clone(), InstanceKey::name(), msgs.iter()); + let msgs = build_rows(NUM_RECTS as usize); + let store = insert_rows(config.clone(), InstanceKey::name(), msgs.iter()); let mut group = c.benchmark_group("datastore/latest_at/missing_components"); group.throughput(criterion::Throughput::Elements(NUM_RECTS as _)); group.bench_function("primary", |b| { b.iter(|| { let results = - latest_messages_at(&store, "non_existing_component".into(), &[Rect2D::name()]); + latest_data_at(&store, "non_existing_component".into(), &[Rect2D::name()]); assert!(results[0].is_none()); }); }); } { - let msgs = build_messages(NUM_RECTS as usize); - let store = insert_messages(config, InstanceKey::name(), msgs.iter()); + let msgs = build_rows(NUM_RECTS as usize); + let store = insert_rows(config, InstanceKey::name(), msgs.iter()); let mut group = c.benchmark_group("datastore/latest_at/missing_components"); group.throughput(criterion::Throughput::Elements(NUM_RECTS as _)); group.bench_function("secondaries", |b| { b.iter(|| { - let results = latest_messages_at( + let results = latest_data_at( &store, Rect2D::name(), &[ @@ -109,15 +110,15 @@ fn latest_at_missing_components(c: &mut Criterion) { fn range_batch(c: &mut Criterion) { { - let msgs = build_messages(NUM_RECTS as usize); - let store = insert_messages(Default::default(), InstanceKey::name(), msgs.iter()); + let msgs = build_rows(NUM_RECTS as usize); + let store = insert_rows(Default::default(), InstanceKey::name(), msgs.iter()); let mut group = c.benchmark_group("datastore/range/batch/rects"); group.throughput(criterion::Throughput::Elements( (NUM_RECTS * NUM_FRAMES) as _, )); group.bench_function("query", |b| { b.iter(|| { - let msgs = range_messages(&store, [Rect2D::name()]); + let msgs = range_data(&store, [Rect2D::name()]); for (cur_time, (time, results)) in msgs.enumerate() { let time = time.unwrap(); assert_eq!(cur_time as i64, time.as_i64()); @@ -146,31 +147,31 @@ criterion_main!(benches); // --- Helpers --- -fn build_messages(n: usize) -> Vec { +fn build_rows(n: usize) -> Vec { (0..NUM_FRAMES) .map(move |frame_idx| { - try_build_msg_bundle2( - MsgId::ZERO, + DataRow::from_cells2( + MsgId::random(), "rects", [build_frame_nr(frame_idx.into())], + n as _, (build_some_instances(n), build_some_rects(n)), ) - .unwrap() }) .collect() } -fn insert_messages<'a>( +fn insert_rows<'a>( config: DataStoreConfig, cluster_key: ComponentName, - msgs: impl Iterator, + rows: impl Iterator, ) -> DataStore { let mut store = DataStore::new(cluster_key, config); - msgs.for_each(|msg_bundle| store.insert(msg_bundle).unwrap()); + rows.for_each(|row| store.insert_row(row).unwrap()); store } -fn latest_messages_at( +fn latest_data_at( store: &DataStore, primary: ComponentName, secondaries: &[ComponentName; N], @@ -185,7 +186,7 @@ fn latest_messages_at( store.get(secondaries, &row_indices) } -fn range_messages( +fn range_data( store: &DataStore, components: [ComponentName; N], ) -> impl Iterator, [Option>; N])> + '_ { diff --git a/crates/re_arrow_store/examples/dump_dataframe.rs b/crates/re_arrow_store/examples/dump_dataframe.rs index 0ec908b302fe9..2caa12696aa14 100644 --- a/crates/re_arrow_store/examples/dump_dataframe.rs +++ b/crates/re_arrow_store/examples/dump_dataframe.rs @@ -4,7 +4,7 @@ //! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example dump_dataframe //! ``` -use re_arrow_store::{test_bundle, DataStore}; +use re_arrow_store::{test_row, DataStore}; use re_log_types::{ component_types::InstanceKey, datagen::{ @@ -25,51 +25,51 @@ fn main() { ]; for ent_path in &ent_paths { - let bundle1 = test_bundle!(ent_path @ [ + let bundle1 = test_row!(ent_path @ [ build_frame_nr(1.into()), build_log_time(Time::now()), - ] => [build_some_instances(2), build_some_rects(2)]); - store.insert(&bundle1).unwrap(); + ] => 2; [build_some_instances(2), build_some_rects(2)]); + store.insert_row(&bundle1).unwrap(); } for ent_path in &ent_paths { - let bundle2 = test_bundle!(ent_path @ [ + let bundle2 = test_row!(ent_path @ [ build_frame_nr(2.into()) - ] => [build_some_instances(2), build_some_point2d(2)]); - store.insert(&bundle2).unwrap(); + ] => 2; [build_some_instances(2), build_some_point2d(2)]); + store.insert_row(&bundle2).unwrap(); // Insert timelessly too! let bundle2 = - test_bundle!(ent_path @ [] => [build_some_instances(2), build_some_point2d(2)]); - store.insert(&bundle2).unwrap(); + test_row!(ent_path @ [] => 2; [build_some_instances(2), build_some_point2d(2)]); + store.insert_row(&bundle2).unwrap(); - let bundle3 = test_bundle!(ent_path @ [ + let bundle3 = test_row!(ent_path @ [ build_frame_nr(3.into()), build_log_time(Time::now()), - ] => [build_some_instances_from(25..29), build_some_point2d(4)]); - store.insert(&bundle3).unwrap(); + ] => 4; [build_some_instances_from(25..29), build_some_point2d(4)]); + store.insert_row(&bundle3).unwrap(); // Insert timelessly too! - let bundle3 = test_bundle!(ent_path @ [] => [build_some_instances_from(25..29), build_some_point2d(4)]); - store.insert(&bundle3).unwrap(); + let bundle3 = test_row!(ent_path @ [] => 4; [build_some_instances_from(25..29), build_some_point2d(4)]); + store.insert_row(&bundle3).unwrap(); } for ent_path in &ent_paths { - let bundle4_1 = test_bundle!(ent_path @ [ + let bundle4_1 = test_row!(ent_path @ [ build_frame_nr(4.into()), build_log_time(Time::now()), - ] => [build_some_instances_from(20..23), build_some_rects(3)]); - store.insert(&bundle4_1).unwrap(); + ] => 3; [build_some_instances_from(20..23), build_some_rects(3)]); + store.insert_row(&bundle4_1).unwrap(); - let bundle4_15 = test_bundle!(ent_path @ [ + let bundle4_15 = test_row!(ent_path @ [ build_frame_nr(4.into()), - ] => [build_some_instances_from(20..23), build_some_point2d(3)]); - store.insert(&bundle4_15).unwrap(); + ] => 3; [build_some_instances_from(20..23), build_some_point2d(3)]); + store.insert_row(&bundle4_15).unwrap(); - let bundle4_2 = test_bundle!(ent_path @ [ + let bundle4_2 = test_row!(ent_path @ [ build_frame_nr(4.into()), build_log_time(Time::now()), - ] => [build_some_instances_from(25..28), build_some_rects(3)]); - store.insert(&bundle4_2).unwrap(); + ] => 3; [build_some_instances_from(25..28), build_some_rects(3)]); + store.insert_row(&bundle4_2).unwrap(); - let bundle4_25 = test_bundle!(ent_path @ [ + let bundle4_25 = test_row!(ent_path @ [ build_frame_nr(4.into()), build_log_time(Time::now()), - ] => [build_some_instances_from(25..28), build_some_point2d(3)]); - store.insert(&bundle4_25).unwrap(); + ] => 3; [build_some_instances_from(25..28), build_some_point2d(3)]); + store.insert_row(&bundle4_25).unwrap(); } let df = store.to_dataframe(); diff --git a/crates/re_arrow_store/examples/latest_component.rs b/crates/re_arrow_store/examples/latest_component.rs index 16b441fe3a1e3..b31bf552281cf 100644 --- a/crates/re_arrow_store/examples/latest_component.rs +++ b/crates/re_arrow_store/examples/latest_component.rs @@ -5,7 +5,7 @@ //! ``` use re_arrow_store::polars_util::latest_component; -use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline}; +use re_arrow_store::{test_row, DataStore, LatestAtQuery, TimeType, Timeline}; use re_log_types::component_types::Rect2D; use re_log_types::datagen::build_some_rects; use re_log_types::{ @@ -19,11 +19,11 @@ fn main() { let ent_path = EntityPath::from("my/entity"); - let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]); - store.insert(&bundle).unwrap(); + let bundle = test_row!(ent_path @ [build_frame_nr(2.into())] => 4; [build_some_rects(4)]); + store.insert_row(&bundle).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + let bundle = test_row!(ent_path @ [build_frame_nr(3.into())] => 2; [build_some_point2d(2)]); + store.insert_row(&bundle).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); diff --git a/crates/re_arrow_store/examples/latest_components.rs b/crates/re_arrow_store/examples/latest_components.rs index e6db104ae9eb6..145f379b58c51 100644 --- a/crates/re_arrow_store/examples/latest_components.rs +++ b/crates/re_arrow_store/examples/latest_components.rs @@ -6,7 +6,7 @@ use polars_core::prelude::*; use re_arrow_store::polars_util::latest_components; -use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline}; +use re_arrow_store::{test_row, DataStore, LatestAtQuery, TimeType, Timeline}; use re_log_types::{ component_types::{InstanceKey, Point2D, Rect2D}, datagen::{build_frame_nr, build_some_point2d, build_some_rects}, @@ -18,11 +18,11 @@ fn main() { let ent_path = EntityPath::from("my/entity"); - let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]); - store.insert(&bundle).unwrap(); + let bundle = test_row!(ent_path @ [build_frame_nr(2.into())] => 4; [build_some_rects(4)]); + store.insert_row(&bundle).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + let bundle = test_row!(ent_path @ [build_frame_nr(3.into())] => 2; [build_some_point2d(2)]); + store.insert_row(&bundle).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); let df = latest_components( diff --git a/crates/re_arrow_store/examples/range_components.rs b/crates/re_arrow_store/examples/range_components.rs index d0fe7b8d732c4..6e6c3267d63bc 100644 --- a/crates/re_arrow_store/examples/range_components.rs +++ b/crates/re_arrow_store/examples/range_components.rs @@ -5,7 +5,7 @@ //! ``` use polars_core::prelude::JoinType; -use re_arrow_store::{polars_util, test_bundle, DataStore, RangeQuery, TimeRange}; +use re_arrow_store::{polars_util, test_row, DataStore, RangeQuery, TimeRange}; use re_log_types::{ component_types::{InstanceKey, Point2D, Rect2D}, datagen::{build_frame_nr, build_some_point2d, build_some_rects}, @@ -22,26 +22,26 @@ fn main() { let frame3 = 3.into(); let frame4 = 4.into(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + let bundle = test_row!(ent_path @ [build_frame_nr(frame1)] => 2; [build_some_rects(2)]); + store.insert_row(&bundle).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + let bundle = test_row!(ent_path @ [build_frame_nr(frame2)] => 2; [build_some_point2d(2)]); + store.insert_row(&bundle).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(4)]); - store.insert(&bundle).unwrap(); + let bundle = test_row!(ent_path @ [build_frame_nr(frame3)] => 4; [build_some_point2d(4)]); + store.insert_row(&bundle).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]); - store.insert(&bundle).unwrap(); + let bundle = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_rects(3)]); + store.insert_row(&bundle).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(1)]); - store.insert(&bundle).unwrap(); + let bundle = test_row!(ent_path @ [build_frame_nr(frame4)] => 1; [build_some_point2d(1)]); + store.insert_row(&bundle).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]); - store.insert(&bundle).unwrap(); + let bundle = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_rects(3)]); + store.insert_row(&bundle).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(3)]); - store.insert(&bundle).unwrap(); + let bundle = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_point2d(3)]); + store.insert_row(&bundle).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(2.into(), 4.into())); diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index 7655aefd2e056..d8595501e86e4 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -225,7 +225,7 @@ pub struct DataStore { /// Used to cache auto-generated cluster components, i.e. `[0]`, `[0, 1]`, `[0, 1, 2]`, etc /// so that they can be properly deduplicated. - pub(crate) cluster_comp_cache: IntMap, + pub(crate) cluster_comp_cache: IntMap, /// Dedicated index tables for timeless data. Never garbage collected. /// diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 944ff5f36e3c0..6a24abe2fff21 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -15,6 +15,8 @@ use crate::{ PersistentIndexTable, RowIndex, }; +// TODO: all of this stuff should be defined by Data{Cell,Row,Table}, not the store + // --- impl DataStore { diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index 245479bd9d097..64898e8c2f682 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -8,8 +8,8 @@ use parking_lot::RwLock; use re_log::{debug, trace}; use re_log_types::{ - msg_bundle::{wrap_in_listarray, MsgBundle}, - ComponentName, DataCell, EntityPath, MsgId, TimeInt, TimePoint, TimeRange, Timeline, + component_types::InstanceKey, Component, ComponentName, DataCell, DataRow, DataTable, + EntityPath, MsgId, TimeInt, TimePoint, TimeRange, Timeline, }; use crate::{ @@ -18,6 +18,9 @@ use crate::{ RowIndexKind, TimeIndex, }; +// TODO: the store needs to shuffle around DataCells, not raw arrays straight from the depths of +// hell. + // --- Data store --- #[derive(thiserror::Error, Debug)] @@ -40,9 +43,9 @@ pub enum WriteError { )] MismatchedInstances { cluster_comp: ComponentName, - cluster_comp_nb_instances: usize, + cluster_comp_nb_instances: u32, key: ComponentName, - num_instances: usize, + num_instances: u32, }, // Misc @@ -53,6 +56,20 @@ pub enum WriteError { pub type WriteResult = ::std::result::Result; impl DataStore { + // TODO: is there anything smarter to do server-side batching wise? + pub fn insert_table(&mut self, table: &DataTable) -> WriteResult<()> { + // TODO: explain that the magic of batching is in how the data is all in the same place + // TODO: need coalescing server-side too + for row in table.as_rows() { + // TODO: should we fail? + self.insert_row(&row)?; + } + + Ok(()) + } + + // TODO: this _is_ insert_row, the rest is insert_cell... + // TODO: update /// Inserts a [`MsgBundle`]'s worth of components into the datastore. /// /// * All components within a single row must share the same number of instances. @@ -60,57 +77,47 @@ impl DataStore { /// If the bundle doesn't carry a payload for the cluster key, one will be auto-generated /// based on the length of the components in the payload, in the form of an array of /// monotonically increasing u64s going from `0` to `N-1`. - pub fn insert(&mut self, msg: &MsgBundle) -> WriteResult<()> { + pub fn insert_row(&mut self, row: &DataRow) -> WriteResult<()> { // TODO(cmc): kind & insert_id need to somehow propagate through the span system. self.insert_id += 1; - let MsgBundle { - msg_id, - entity_path: ent_path, - time_point, - cells, - } = msg; - - if cells.is_empty() { + if row.num_components() == 0 { return Ok(()); } crate::profile_function!(); - let ent_path_hash = ent_path.hash(); + let DataRow { + row_id, + timepoint, + entity_path: ent_path, + num_instances: _, + cells, + } = row; - // TODO(cmc): remove this thing when removing MsgBundle. - // - // Effectively the same thing as having a non-unit length batch, except it's really not - // worth more than an assertion since: - // - A) `MsgBundle` should already guarantee this - // - B) this limitation should be gone soon enough - debug_assert!( - msg.cells.iter().map(|cell| cell.name()).all_unique(), - "cannot insert same component multiple times, this is equivalent to multiple rows", - ); + let ent_path_hash = ent_path.hash(); trace!( kind = "insert", id = self.insert_id, cluster_key = %self.cluster_key, - timelines = ?time_point.iter() + timelines = ?timepoint.iter() .map(|(timeline, time)| (timeline.name(), timeline.typ().format(*time))) .collect::>(), entity = %ent_path, - components = ?cells.iter().map(|cell| cell.name()).collect_vec(), + components = ?cells.iter().map(|cell| cell.component()).collect_vec(), "insertion started..." ); let cluster_comp_pos = cells .iter() - .find_position(|cell| cell.name() == self.cluster_key) + .find_position(|cell| cell.component() == self.cluster_key) .map(|(pos, _)| pos); - if time_point.is_timeless() { + if timepoint.is_timeless() { let mut row_indices = IntMap::default(); - self.insert_timeless_row(cluster_comp_pos, cells, &mut row_indices)?; + self._insert_timeless_row(cluster_comp_pos, cells, &mut row_indices)?; let index = self .timeless_indices @@ -120,9 +127,9 @@ impl DataStore { } else { let mut row_indices = IntMap::default(); - self.insert_row(time_point, cluster_comp_pos, cells, &mut row_indices)?; + self._insert_row(timepoint, cluster_comp_pos, cells, &mut row_indices)?; - for (timeline, time) in time_point.iter() { + for (timeline, time) in timepoint.iter() { let ent_path = ent_path.clone(); // shallow let index = self .indices @@ -133,12 +140,12 @@ impl DataStore { } // This is valuable information, even for a timeless timepoint! - self.messages.insert(*msg_id, time_point.clone()); + self.messages.insert(*row_id, timepoint.clone()); Ok(()) } - fn insert_timeless_row( + fn _insert_timeless_row( &mut self, cluster_comp_pos: Option, cells: &[DataCell], @@ -167,42 +174,34 @@ impl DataStore { for cell in cells .iter() - .filter(|bundle| bundle.name() != self.cluster_key) + .filter(|cell| cell.component() != self.cluster_key) { - let (name, rows) = (cell.name(), cell.as_arrow()); - - // TODO(cmc): A) wrapping this doesn't make sense and B) we should not work directly - // with arrow Arrays. - let rows_single = wrap_in_listarray(rows); + let component = cell.component(); + let num_instances = cell.len(); + let rows_single = cell.as_arrow_monolist(); // TODO(cmc): batching - let num_instances = rows_single.offsets().lengths().next().unwrap(); if num_instances != cluster_len { return Err(WriteError::MismatchedInstances { cluster_comp: self.cluster_key, cluster_comp_nb_instances: cluster_len, - key: name, + key: component, num_instances, }); } let table = self .timeless_components - .entry(cell.name()) - .or_insert_with(|| { - PersistentComponentTable::new( - name, - ListArray::::get_child_type(rows_single.data_type()), - ) - }); + .entry(cell.component()) + .or_insert_with(|| PersistentComponentTable::new(component, cell.datatype())); - let row_idx = table.push(&*rows_single.to_boxed()); - row_indices.insert(name, row_idx); + let row_idx = table.push(&*rows_single); + row_indices.insert(component, row_idx); } Ok(()) } - fn insert_row( + fn _insert_row( &mut self, time_point: &TimePoint, cluster_comp_pos: Option, @@ -230,33 +229,30 @@ impl DataStore { ); } - for cell in cells.iter().filter(|cell| cell.name() != self.cluster_key) { - let (name, rows) = (cell.name(), cell.as_arrow()); - - // TODO(cmc): A) wrapping this doesn't make sense and B) we should not work directly - // with arrow Arrays. - let rows_single = wrap_in_listarray(rows); + for cell in cells + .iter() + .filter(|cell| cell.component() != self.cluster_key) + { + let component = cell.component(); + let num_instances = cell.len(); + let rows_single = cell.as_arrow_monolist(); // TODO(cmc): batching - // TODO(#440): support for splats - let num_instances = rows_single.offsets().lengths().next().unwrap(); if num_instances != cluster_len { return Err(WriteError::MismatchedInstances { cluster_comp: self.cluster_key, cluster_comp_nb_instances: cluster_len, - key: name, + key: component, num_instances, }); } - let table = self.components.entry(cell.name()).or_insert_with(|| { - ComponentTable::new( - name, - ListArray::::get_child_type(rows_single.data_type()), - ) - }); + let table = self + .components + .entry(cell.component()) + .or_insert_with(|| ComponentTable::new(component, cell.datatype())); - let row_idx = table.push(&self.config, time_point, &*rows_single.to_boxed()); - row_indices.insert(name, row_idx); + let row_idx = table.push(&self.config, time_point, &*rows_single); + row_indices.insert(component, row_idx); } Ok(()) @@ -273,13 +269,13 @@ impl DataStore { cluster_comp_pos: Option, components: &[DataCell], time_point: &TimePoint, - ) -> WriteResult<(RowIndex, usize)> { + ) -> WriteResult<(RowIndex, u32)> { crate::profile_function!(); enum ClusterData { Cached(RowIndex), GenData(Box), - UserData(ListArray), + UserData(Box), } let (cluster_len, cluster_data) = if let Some(cluster_comp_pos) = cluster_comp_pos { @@ -288,7 +284,7 @@ impl DataStore { let cluster_comp = &components[cluster_comp_pos]; let data = cluster_comp.as_arrow(); - let len = data.len(); + let len = data.len() as u32; // Clustering component must be dense. if !data.is_dense() { @@ -299,10 +295,7 @@ impl DataStore { return Err(WriteError::InvalidClusteringComponent(data.clone())); } - ( - len, - ClusterData::UserData(wrap_in_listarray(cluster_comp.as_arrow())), - ) + (len, ClusterData::UserData(cluster_comp.as_arrow_monolist())) } else { // The caller has not specified any cluster component, and so we'll have to generate // one... unless we've already generated one of this exact length in the past, @@ -316,9 +309,9 @@ impl DataStore { // Cache hit! Re-use that row index. (len, ClusterData::Cached(*row_idx)) } else { - // Cache miss! Craft a new u64 array from the ground up. - let data = UInt64Array::from_vec((0..len as u64).collect_vec()).boxed(); - let data = wrap_in_listarray(data).to_boxed(); + // Cache miss! Craft a new instance keys from the ground up. + let data = + DataCell::from_component::(0..len as u64).as_arrow_monolist(); (len, ClusterData::GenData(data)) } }; @@ -921,8 +914,7 @@ impl PersistentComponentTable { /// `datatype` must be the type of the component itself, devoid of any wrapping layers /// (i.e. _not_ a `ListArray<...>`!). fn new(name: ComponentName, datatype: &DataType) -> Self { - // TODO(cmc): think about this when implementing deletion. - let chunks = vec![wrap_in_listarray(new_empty_array(datatype.clone())).to_boxed()]; + let chunks = vec![DataCell::from_arrow_empty(name, datatype.clone()).as_arrow_monolist()]; let total_rows = chunks.iter().map(|values| values.len() as u64).sum(); let total_size_bytes = chunks .iter() @@ -1093,8 +1085,9 @@ impl ComponentBucket { pub fn new(name: ComponentName, datatype: &DataType, row_offset: u64) -> Self { // If this is the first bucket of this table, we need to insert an empty list at // row index #0! + // TODO(cmc): this needs to go let chunks = if row_offset == 0 { - vec![wrap_in_listarray(new_empty_array(datatype.clone())).to_boxed()] + vec![DataCell::from_arrow_empty(name, datatype.clone()).as_arrow_monolist()] } else { vec![] }; diff --git a/crates/re_arrow_store/src/test_util.rs b/crates/re_arrow_store/src/test_util.rs index b739d795fd508..870624782a2cf 100644 --- a/crates/re_arrow_store/src/test_util.rs +++ b/crates/re_arrow_store/src/test_util.rs @@ -4,24 +4,24 @@ use crate::DataStoreConfig; #[doc(hidden)] #[macro_export] -macro_rules! test_bundle { - ($entity:ident @ $frames:tt => [$c0:expr $(,)*]) => { - ::re_log_types::msg_bundle::try_build_msg_bundle1( +macro_rules! test_row { + ($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => { + ::re_log_types::DataRow::from_cells1( ::re_log_types::MsgId::random(), $entity.clone(), $frames, + $n, $c0, ) - .unwrap() }; - ($entity:ident @ $frames:tt => [$c0:expr, $c1:expr $(,)*]) => { - re_log_types::msg_bundle::try_build_msg_bundle2( + ($entity:ident @ $frames:tt => $n:expr; [$c0:expr, $c1:expr $(,)*]) => { + ::re_log_types::DataRow::from_cells2( ::re_log_types::MsgId::random(), $entity.clone(), $frames, + $n, ($c0, $c1), ) - .unwrap() }; } diff --git a/crates/re_arrow_store/tests/correctness.rs b/crates/re_arrow_store/tests/correctness.rs index 1d7aa831c3baa..b946ac4fbfd39 100644 --- a/crates/re_arrow_store/tests/correctness.rs +++ b/crates/re_arrow_store/tests/correctness.rs @@ -8,7 +8,7 @@ use arrow2::array::UInt64Array; use rand::Rng; use re_arrow_store::{ - test_bundle, DataStore, DataStoreConfig, GarbageCollectionTarget, LatestAtQuery, WriteError, + test_row, DataStore, DataStoreConfig, GarbageCollectionTarget, LatestAtQuery, WriteError, }; use re_log_types::{ component_types::InstanceKey, @@ -34,12 +34,12 @@ fn write_errors() { } let mut store = DataStore::new(InstanceKey::name(), Default::default()); - let bundle = test_bundle!(ent_path @ - [build_frame_nr(32.into()), build_log_time(Time::now())] => [ + let bundle = test_row!(ent_path @ + [build_frame_nr(32.into()), build_log_time(Time::now())] => 3; [ build_sparse_instances(), build_some_point2d(3) ]); assert!(matches!( - store.insert(&bundle), + store.insert_row(&bundle), Err(WriteError::SparseClusteringComponent(_)), )); } @@ -57,22 +57,22 @@ fn write_errors() { let mut store = DataStore::new(InstanceKey::name(), Default::default()); { - let bundle = test_bundle!(ent_path @ - [build_frame_nr(32.into()), build_log_time(Time::now())] => [ + let bundle = test_row!(ent_path @ + [build_frame_nr(32.into()), build_log_time(Time::now())] => 3; [ build_unsorted_instances(), build_some_point2d(3) ]); assert!(matches!( - store.insert(&bundle), + store.insert_row(&bundle), Err(WriteError::InvalidClusteringComponent(_)), )); } { - let bundle = test_bundle!(ent_path @ - [build_frame_nr(32.into()), build_log_time(Time::now())] => [ + let bundle = test_row!(ent_path @ + [build_frame_nr(32.into()), build_log_time(Time::now())] => 3; [ build_duped_instances(), build_some_point2d(3) ]); assert!(matches!( - store.insert(&bundle), + store.insert_row(&bundle), Err(WriteError::InvalidClusteringComponent(_)), )); } @@ -80,12 +80,12 @@ fn write_errors() { { let mut store = DataStore::new(InstanceKey::name(), Default::default()); - let bundle = test_bundle!(ent_path @ - [build_frame_nr(32.into()), build_log_time(Time::now())] => [ + let bundle = test_row!(ent_path @ + [build_frame_nr(32.into()), build_log_time(Time::now())] => 4; [ build_some_instances(4), build_some_point2d(3) ]); assert!(matches!( - store.insert(&bundle), + store.insert_row(&bundle), Err(WriteError::MismatchedInstances { .. }), )); } @@ -113,11 +113,9 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) { let num_instances = 3; store - .insert( - &test_bundle!(ent_path @ [build_log_time(now), build_frame_nr(frame40)] => [ - build_some_instances(num_instances), - ]), - ) + .insert_row(&test_row!(ent_path @ [ + build_log_time(now), build_frame_nr(frame40), + ] => num_instances; [build_some_instances(num_instances as _)])) .unwrap(); if let err @ Err(_) = store.sanity_check() { @@ -251,8 +249,8 @@ fn range_join_across_single_row_impl(store: &mut DataStore) { let points = build_some_point2d(3); let colors = build_some_colors(3); let bundle = - test_bundle!(ent_path @ [build_frame_nr(42.into())] => [points.clone(), colors.clone()]); - store.insert(&bundle).unwrap(); + test_row!(ent_path @ [build_frame_nr(42.into())] => 3; [points.clone(), colors.clone()]); + store.insert_row(&bundle).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); let query = re_arrow_store::RangeQuery::new( @@ -314,10 +312,12 @@ fn gc_correct() { for i in 0..num_ents { let ent_path = EntityPath::from(format!("this/that/{i}")); let num_instances = rng.gen_range(0..=1_000); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame_nr.into())] => [ - build_some_colors(num_instances), + let bundle = test_row!(ent_path @ [ + build_frame_nr(frame_nr.into()), + ] => num_instances; [ + build_some_colors(num_instances as _), ]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } } diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 673e1cf4e04fd..d54d6e5612934 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -7,12 +7,13 @@ use std::sync::atomic::{AtomicBool, Ordering}; use arrow2::array::{Array, UInt64Array}; +use itertools::Itertools as _; use nohash_hasher::IntMap; use polars_core::{prelude::*, series::Series}; use polars_ops::prelude::DataFrameJoinOps; use rand::Rng; use re_arrow_store::{ - polars_util, test_bundle, DataStore, DataStoreConfig, GarbageCollectionTarget, LatestAtQuery, + polars_util, test_row, DataStore, DataStoreConfig, GarbageCollectionTarget, LatestAtQuery, RangeQuery, TimeInt, TimeRange, }; use re_log_types::{ @@ -22,10 +23,12 @@ use re_log_types::{ build_some_point2d, build_some_rects, }, external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, - msg_bundle::{wrap_in_listarray, MsgBundle}, - Component as _, ComponentName, EntityPath, MsgId, TimeType, Timeline, + Component as _, ComponentName, DataCell, DataRow, DataTable, EntityPath, MsgId, TimeType, + Timeline, }; +// TODO(cmc): introduce batching in the testing matrix + // --- LatestComponentsAt --- #[test] @@ -80,7 +83,7 @@ fn all_components() { ColorRGBA::name(), // added by us, timeless Rect2D::name(), // added by us cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; @@ -90,25 +93,23 @@ fn all_components() { Point2D::name(), // added by us Rect2D::name(), // added by us cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; - let bundle = test_bundle!(ent_path @ [] => [build_some_colors(2)]); - store.insert(&bundle).unwrap(); + let row = test_row!(ent_path @ [] => 2; [build_some_colors(2)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [ - build_frame_nr(frame1), - ] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame1)] => 2; [build_some_rects(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [ + let row = test_row!(ent_path @ [ build_frame_nr(frame2), - ] => [build_some_rects(2), build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + ] => 2; [build_some_rects(2), build_some_point2d(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_b)); @@ -148,7 +149,7 @@ fn all_components() { ColorRGBA::name(), // added by us, timeless Rect2D::name(), // added by us cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; @@ -158,26 +159,26 @@ fn all_components() { Rect2D::name(), // ⚠ inherited before the buckets got split apart! Point2D::name(), // added by us cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; - let bundle = test_bundle!(ent_path @ [] => [build_some_colors(2)]); - store.insert(&bundle).unwrap(); + let row = test_row!(ent_path @ [] => 2; [build_some_colors(2)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame1)] => 2; [build_some_rects(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_instances(2)]); - store.insert(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame2)] => 2; [build_some_instances(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame3)] => 2; [build_some_point2d(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_b)); @@ -220,7 +221,7 @@ fn all_components() { ColorRGBA::name(), // added by us, timeless Rect2D::name(), // added by us cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; @@ -230,31 +231,31 @@ fn all_components() { Point2D::name(), // added by us but not contained in the second bucket Rect2D::name(), // added by use cluster_key, // always here - MsgId::name(), // automatically appended by MsgBundle + MsgId::name(), // automatically appended by DataTable #[cfg(debug_assertions)] DataStore::insert_id_key(), // automatically added in debug ]; - let bundle = test_bundle!(ent_path @ [] => [build_some_colors(2)]); - store.insert(&bundle).unwrap(); + let row = test_row!(ent_path @ [] => 2; [build_some_colors(2)]); + store.insert_row(&row).unwrap(); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame2)] => 2; [build_some_rects(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame3)] => 2; [build_some_rects(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 2; [build_some_rects(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + let row = test_row!(ent_path @ [build_frame_nr(frame1)] => 2; [build_some_point2d(2)]); + store.insert_row(&row).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_b)); @@ -295,33 +296,32 @@ fn latest_at_impl(store: &mut DataStore) { let frame3: TimeInt = 3.into(); let frame4: TimeInt = 4.into(); - // helper to insert a bundle both as a temporal and timeless payload - let insert = |store: &mut DataStore, bundle| { + // helper to insert a row both as a temporal and timeless payload + let insert = |store: &mut DataStore, row| { // insert temporal - store.insert(bundle).unwrap(); + store.insert_row(row).unwrap(); // insert timeless - let mut bundle_timeless = bundle.clone(); - bundle_timeless.time_point = Default::default(); - store.insert(&bundle_timeless).unwrap(); + let mut row_timeless = (*row).clone(); + row_timeless.timepoint = Default::default(); + store.insert_row(&row_timeless).unwrap(); }; let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); - let bundle1 = - test_bundle!(ent_path @ [build_frame_nr(frame1)] => [instances1.clone(), colors1]); - insert(store, &bundle1); + let row1 = test_row!(ent_path @ [build_frame_nr(frame1)] => 3; [instances1.clone(), colors1]); + insert(store, &row1); let points2 = build_some_point2d(3); - let bundle2 = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [instances1, points2]); - insert(store, &bundle2); + let row2 = test_row!(ent_path @ [build_frame_nr(frame2)] => 3; [instances1, points2]); + insert(store, &row2); let points3 = build_some_point2d(10); - let bundle3 = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [points3]); - insert(store, &bundle3); + let row3 = test_row!(ent_path @ [build_frame_nr(frame3)] => 10; [points3]); + insert(store, &row3); let colors4 = build_some_colors(5); - let bundle4 = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [colors4]); - insert(store, &bundle4); + let row4 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [colors4]); + insert(store, &row4); if let err @ Err(_) = store.sanity_check() { store.sort_indices_if_needed(); @@ -329,50 +329,49 @@ fn latest_at_impl(store: &mut DataStore) { err.unwrap(); } - let mut assert_latest_components = - |frame_nr: TimeInt, bundles: &[(ComponentName, &MsgBundle)]| { - let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); - let components_all = &[ColorRGBA::name(), Point2D::name()]; + let mut assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, &DataRow)]| { + let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); + let components_all = &[ColorRGBA::name(), Point2D::name()]; - let df = polars_util::latest_components( - store, - &LatestAtQuery::new(timeline_frame_nr, frame_nr), - &ent_path, - components_all, - &JoinType::Outer, - ) - .unwrap(); + let df = polars_util::latest_components( + store, + &LatestAtQuery::new(timeline_frame_nr, frame_nr), + &ent_path, + components_all, + &JoinType::Outer, + ) + .unwrap(); - let df_expected = joint_df(store.cluster_key(), bundles); + let df_expected = joint_df(store.cluster_key(), rows); - store.sort_indices_if_needed(); - assert_eq!(df_expected, df, "{store}"); - }; + store.sort_indices_if_needed(); + assert_eq!(df_expected, df, "{store}"); + }; // TODO(cmc): bring back some log_time scenarios assert_latest_components( frame0, - &[(ColorRGBA::name(), &bundle4), (Point2D::name(), &bundle3)], // timeless + &[(ColorRGBA::name(), &row4), (Point2D::name(), &row3)], // timeless ); assert_latest_components( frame1, &[ - (ColorRGBA::name(), &bundle1), - (Point2D::name(), &bundle3), // timeless + (ColorRGBA::name(), &row1), + (Point2D::name(), &row3), // timeless ], ); assert_latest_components( frame2, - &[(ColorRGBA::name(), &bundle1), (Point2D::name(), &bundle2)], + &[(ColorRGBA::name(), &row1), (Point2D::name(), &row2)], ); assert_latest_components( frame3, - &[(ColorRGBA::name(), &bundle1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row1), (Point2D::name(), &row3)], ); assert_latest_components( frame4, - &[(ColorRGBA::name(), &bundle4), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4), (Point2D::name(), &row3)], ); } @@ -400,54 +399,52 @@ fn range_impl(store: &mut DataStore) { let frame4: TimeInt = 4.into(); let frame5: TimeInt = 5.into(); - // helper to insert a bundle both as a temporal and timeless payload - let insert = |store: &mut DataStore, bundle| { + // helper to insert a row both as a temporal and timeless payload + let insert = |store: &mut DataStore, row| { // insert temporal - store.insert(bundle).unwrap(); + store.insert_row(row).unwrap(); // insert timeless - let mut bundle_timeless = bundle.clone(); - bundle_timeless.time_point = Default::default(); - store.insert(&bundle_timeless).unwrap(); + let mut row_timeless = (*row).clone(); + row_timeless.timepoint = Default::default(); + store.insert_row(&row_timeless).unwrap(); }; let insts1 = build_some_instances(3); let colors1 = build_some_colors(3); - let bundle1 = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [insts1.clone(), colors1]); - insert(store, &bundle1); + let row1 = test_row!(ent_path @ [build_frame_nr(frame1)] => 3; [insts1.clone(), colors1]); + insert(store, &row1); let points2 = build_some_point2d(3); - let bundle2 = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [insts1, points2]); - insert(store, &bundle2); + let row2 = test_row!(ent_path @ [build_frame_nr(frame2)] => 3; [insts1, points2]); + insert(store, &row2); let points3 = build_some_point2d(10); - let bundle3 = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [points3]); - insert(store, &bundle3); + let row3 = test_row!(ent_path @ [build_frame_nr(frame3)] => 10; [points3]); + insert(store, &row3); let insts4_1 = build_some_instances_from(20..25); let colors4_1 = build_some_colors(5); - let bundle4_1 = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [insts4_1, colors4_1]); - insert(store, &bundle4_1); + let row4_1 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [insts4_1, colors4_1]); + insert(store, &row4_1); let insts4_2 = build_some_instances_from(25..30); let colors4_2 = build_some_colors(5); - let bundle4_2 = - test_bundle!(ent_path @ [build_frame_nr(frame4)] => [insts4_2.clone(), colors4_2]); - insert(store, &bundle4_2); + let row4_2 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [insts4_2.clone(), colors4_2]); + insert(store, &row4_2); let points4_25 = build_some_point2d(5); - let bundle4_25 = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [insts4_2, points4_25]); - insert(store, &bundle4_25); + let row4_25 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [insts4_2, points4_25]); + insert(store, &row4_25); let insts4_3 = build_some_instances_from(30..35); let colors4_3 = build_some_colors(5); - let bundle4_3 = - test_bundle!(ent_path @ [build_frame_nr(frame4)] => [insts4_3.clone(), colors4_3]); - insert(store, &bundle4_3); + let row4_3 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [insts4_3.clone(), colors4_3]); + insert(store, &row4_3); let points4_4 = build_some_point2d(5); - let bundle4_4 = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [insts4_3, points4_4]); - insert(store, &bundle4_4); + let row4_4 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [insts4_3, points4_4]); + insert(store, &row4_4); if let err @ Err(_) = store.sanity_check() { store.sort_indices_if_needed(); @@ -455,7 +452,7 @@ fn range_impl(store: &mut DataStore) { err.unwrap(); } - // Each entry in `bundles_at_times` corresponds to a dataframe that's expected to be returned + // Each entry in `rows_at_times` corresponds to a dataframe that's expected to be returned // by the range query. // A single timepoint might have several of those! That's one of the behaviors specific to // range queries. @@ -463,16 +460,16 @@ fn range_impl(store: &mut DataStore) { let mut assert_range_components = |time_range: TimeRange, components: [ComponentName; 2], - bundles_at_times: &[(Option, &[(ComponentName, &MsgBundle)])]| { + rows_at_times: &[(Option, &[(ComponentName, &DataRow)])]| { let mut expected_timeless = Vec::::new(); let mut expected_at_times: IntMap> = Default::default(); - for (time, bundles) in bundles_at_times { + for (time, rows) in rows_at_times { if let Some(time) = time { let dfs = expected_at_times.entry(*time).or_default(); - dfs.push(joint_df(store.cluster_key(), bundles)); + dfs.push(joint_df(store.cluster_key(), rows)); } else { - expected_timeless.push(joint_df(store.cluster_key(), bundles)); + expected_timeless.push(joint_df(store.cluster_key(), rows)); } } @@ -511,7 +508,7 @@ fn range_impl(store: &mut DataStore) { dfs_processed += 1; } - let dfs_processed_expected = bundles_at_times.len(); + let dfs_processed_expected = rows_at_times.len(); assert_eq!(dfs_processed_expected, dfs_processed); }; @@ -525,16 +522,13 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame0), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_4), - ], + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_4)], ), // timeless ( Some(frame1), &[ - (ColorRGBA::name(), &bundle1), - (Point2D::name(), &bundle4_4), // timeless + (ColorRGBA::name(), &row1), + (Point2D::name(), &row4_4), // timeless ], ), ], @@ -546,8 +540,8 @@ fn range_impl(store: &mut DataStore) { ( Some(frame1), &[ - (ColorRGBA::name(), &bundle1), - (Point2D::name(), &bundle4_4), // timeless + (ColorRGBA::name(), &row1), + (Point2D::name(), &row4_4), // timeless ], ), // ], @@ -558,7 +552,7 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame2), - &[(ColorRGBA::name(), &bundle1), (Point2D::name(), &bundle2)], + &[(ColorRGBA::name(), &row1), (Point2D::name(), &row2)], ), // ], ); @@ -568,22 +562,19 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame3), - &[(ColorRGBA::name(), &bundle1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row1), (Point2D::name(), &row3)], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_1), (Point2D::name(), &row3)], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_2), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_2), (Point2D::name(), &row3)], ), ( Some(frame4), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_25), - ], // !!! + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_25)], // !!! ), ], ); @@ -593,10 +584,7 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame4), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_4), - ], // !!! + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_4)], // !!! ), // ], ); @@ -609,10 +597,7 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame0), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), // timeless ], ); @@ -623,13 +608,13 @@ fn range_impl(store: &mut DataStore) { ( Some(frame1), &[ - (Point2D::name(), &bundle4_4), // timeless - (ColorRGBA::name(), &bundle1), + (Point2D::name(), &row4_4), // timeless + (ColorRGBA::name(), &row1), ], ), ( Some(frame2), - &[(Point2D::name(), &bundle2), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row2), (ColorRGBA::name(), &row1)], ), // ], ); @@ -639,11 +624,11 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame2), - &[(Point2D::name(), &bundle2), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row2), (ColorRGBA::name(), &row1)], ), ( Some(frame3), - &[(Point2D::name(), &bundle3), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row3), (ColorRGBA::name(), &row1)], ), ], ); @@ -653,21 +638,15 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame3), - &[(Point2D::name(), &bundle3), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row3), (ColorRGBA::name(), &row1)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_25), - (ColorRGBA::name(), &bundle4_2), - ], + &[(Point2D::name(), &row4_25), (ColorRGBA::name(), &row4_2)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), ], ); @@ -677,10 +656,7 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame4), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), // ], ); @@ -693,32 +669,26 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame0), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_4), - ], + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_4)], ), // timeless ( Some(frame1), &[ - (ColorRGBA::name(), &bundle1), - (Point2D::name(), &bundle4_4), // timeless + (ColorRGBA::name(), &row1), + (Point2D::name(), &row4_4), // timeless ], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_1), (Point2D::name(), &row3)], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_2), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_2), (Point2D::name(), &row3)], ), ( Some(frame4), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_25), - ], // !!! + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_25)], // !!! ), ], ); @@ -731,32 +701,23 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame0), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), // timeless ( Some(frame2), - &[(Point2D::name(), &bundle2), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row2), (ColorRGBA::name(), &row1)], ), ( Some(frame3), - &[(Point2D::name(), &bundle3), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row3), (ColorRGBA::name(), &row1)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_25), - (ColorRGBA::name(), &bundle4_2), - ], + &[(Point2D::name(), &row4_25), (ColorRGBA::name(), &row4_2)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), ], ); @@ -767,43 +728,37 @@ fn range_impl(store: &mut DataStore) { TimeRange::new(TimeInt::MIN, TimeInt::MAX), [ColorRGBA::name(), Point2D::name()], &[ - (None, &[(ColorRGBA::name(), &bundle1)]), + (None, &[(ColorRGBA::name(), &row1)]), ( None, - &[(ColorRGBA::name(), &bundle4_1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_1), (Point2D::name(), &row3)], ), ( None, - &[(ColorRGBA::name(), &bundle4_2), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_2), (Point2D::name(), &row3)], ), ( None, - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_25), - ], // !!! + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_25)], // !!! ), ( Some(frame1), &[ - (ColorRGBA::name(), &bundle1), - (Point2D::name(), &bundle4_4), // timeless + (ColorRGBA::name(), &row1), + (Point2D::name(), &row4_4), // timeless ], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_1), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_1), (Point2D::name(), &row3)], ), ( Some(frame4), - &[(ColorRGBA::name(), &bundle4_2), (Point2D::name(), &bundle3)], + &[(ColorRGBA::name(), &row4_2), (Point2D::name(), &row3)], ), ( Some(frame4), - &[ - (ColorRGBA::name(), &bundle4_3), - (Point2D::name(), &bundle4_25), - ], // !!! + &[(ColorRGBA::name(), &row4_3), (Point2D::name(), &row4_25)], // !!! ), ], ); @@ -816,47 +771,35 @@ fn range_impl(store: &mut DataStore) { &[ ( None, - &[(Point2D::name(), &bundle2), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row2), (ColorRGBA::name(), &row1)], ), ( None, - &[(Point2D::name(), &bundle3), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row3), (ColorRGBA::name(), &row1)], ), ( None, - &[ - (Point2D::name(), &bundle4_25), - (ColorRGBA::name(), &bundle4_2), - ], + &[(Point2D::name(), &row4_25), (ColorRGBA::name(), &row4_2)], ), ( None, - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), ( Some(frame2), - &[(Point2D::name(), &bundle2), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row2), (ColorRGBA::name(), &row1)], ), ( Some(frame3), - &[(Point2D::name(), &bundle3), (ColorRGBA::name(), &bundle1)], + &[(Point2D::name(), &row3), (ColorRGBA::name(), &row1)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_25), - (ColorRGBA::name(), &bundle4_2), - ], + &[(Point2D::name(), &row4_25), (ColorRGBA::name(), &row4_2)], ), ( Some(frame4), - &[ - (Point2D::name(), &bundle4_4), - (ColorRGBA::name(), &bundle4_3), - ], + &[(Point2D::name(), &row4_4), (ColorRGBA::name(), &row4_3)], ), ], ); @@ -864,37 +807,29 @@ fn range_impl(store: &mut DataStore) { // --- Common helpers --- -/// Given a list of bundles, crafts a `latest_components`-looking dataframe. -fn joint_df(cluster_key: ComponentName, bundles: &[(ComponentName, &MsgBundle)]) -> DataFrame { - let df = bundles +/// Given a list of rows, crafts a `latest_components`-looking dataframe. +fn joint_df(cluster_key: ComponentName, rows: &[(ComponentName, &DataRow)]) -> DataFrame { + let df = rows .iter() - .map(|(component, bundle)| { - let cluster_comp = if let Some(idx) = bundle.find_component(&cluster_key) { - Series::try_from(( - cluster_key.as_str(), - wrap_in_listarray(bundle.cells[idx].as_arrow()).to_boxed(), - )) - .unwrap() + .map(|(component, row)| { + let cluster_comp = if let Some(idx) = row.find_component(&cluster_key) { + Series::try_from((cluster_key.as_str(), row.cells[idx].as_arrow_monolist())) + .unwrap() } else { - let num_instances = bundle.num_instances(); + let num_instances = row.num_instances(); Series::try_from(( cluster_key.as_str(), - wrap_in_listarray( - UInt64Array::from_vec((0..num_instances as u64).collect()).to_boxed(), - ) - .to_boxed(), + DataCell::from_component::(0..num_instances as u64) + .as_arrow_monolist(), )) .unwrap() }; - let comp_idx = bundle.find_component(component).unwrap(); + let comp_idx = row.find_component(component).unwrap(); let df = DataFrame::new(vec![ cluster_comp, - Series::try_from(( - component.as_str(), - wrap_in_listarray(bundle.cells[comp_idx].as_arrow()).to_boxed(), - )) - .unwrap(), + Series::try_from((component.as_str(), row.cells[comp_idx].as_arrow_monolist())) + .unwrap(), ]) .unwrap(); @@ -935,10 +870,12 @@ fn gc_impl(store: &mut DataStore) { let frames = (0..num_frames).filter(|_| rand::thread_rng().gen()); for frame_nr in frames { let num_instances = rng.gen_range(0..=1_000); - let bundle = test_bundle!(ent_path @ [build_frame_nr(frame_nr.into())] => [ - build_some_rects(num_instances), + let row = test_row!(ent_path @ [ + build_frame_nr(frame_nr.into()) + ] => num_instances; [ + build_some_rects(num_instances as _), ]); - store.insert(&bundle).unwrap(); + store.insert_row(&row).unwrap(); } } diff --git a/crates/re_arrow_store/tests/internals.rs b/crates/re_arrow_store/tests/internals.rs index ef03e9ec373d3..b81e09600a3d7 100644 --- a/crates/re_arrow_store/tests/internals.rs +++ b/crates/re_arrow_store/tests/internals.rs @@ -8,8 +8,7 @@ use re_arrow_store::{DataStore, DataStoreConfig}; use re_log_types::{ component_types::InstanceKey, datagen::{build_frame_nr, build_some_instances}, - msg_bundle::MsgBundle, - Component as _, EntityPath, MsgId, TimePoint, + Component as _, DataRow, EntityPath, MsgId, TimePoint, }; // --- Internals --- @@ -51,23 +50,25 @@ fn pathological_bucket_topology() { let ent_path = EntityPath::from("this/that"); let num_instances = 1; - let time_point = TimePoint::from([build_frame_nr(frame_nr.into())]); + let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]); for _ in 0..num { - let msg = MsgBundle::new( + let row = DataRow::from_cells1( MsgId::ZERO, ent_path.clone(), - time_point.clone(), - vec![build_some_instances(num_instances).try_into().unwrap()], + timepoint.clone(), + num_instances, + build_some_instances(num_instances as _), ); - store_forward.insert(&msg).unwrap(); + store_forward.insert_row(&row).unwrap(); - let msg = MsgBundle::new( + let row = DataRow::from_cells1( MsgId::ZERO, ent_path.clone(), - time_point.clone(), - vec![build_some_instances(num_instances).try_into().unwrap()], + timepoint.clone(), + num_instances, + build_some_instances(num_instances as _), ); - store_backward.insert(&msg).unwrap(); + store_backward.insert_row(&row).unwrap(); } } @@ -79,24 +80,26 @@ fn pathological_bucket_topology() { let ent_path = EntityPath::from("this/that"); let num_instances = 1; - let msgs = range + // TODO(cmc): to update once batching lands + let rows = range .map(|frame_nr| { - let time_point = TimePoint::from([build_frame_nr(frame_nr.into())]); - MsgBundle::new( + let timepoint = TimePoint::from([build_frame_nr(frame_nr.into())]); + DataRow::from_cells1( MsgId::ZERO, ent_path.clone(), - time_point, - vec![build_some_instances(num_instances).try_into().unwrap()], + timepoint, + num_instances, + build_some_instances(num_instances as _), ) }) .collect::>(); - msgs.iter() - .for_each(|msg| store_forward.insert(msg).unwrap()); + rows.iter() + .for_each(|row| store_forward.insert_row(row).unwrap()); - msgs.iter() + rows.iter() .rev() - .for_each(|msg| store_backward.insert(msg).unwrap()); + .for_each(|row| store_backward.insert_row(row).unwrap()); } store_repeated_frame(1000, 10, &mut store_forward, &mut store_backward); diff --git a/crates/re_data_store/examples/memory_usage.rs b/crates/re_data_store/examples/memory_usage.rs index 29a8386e3dc48..9b023d4203667 100644 --- a/crates/re_data_store/examples/memory_usage.rs +++ b/crates/re_data_store/examples/memory_usage.rs @@ -48,7 +48,7 @@ fn live_bytes() -> usize { // ---------------------------------------------------------------------------- -use re_log_types::{entity_path, MsgId}; +use re_log_types::{entity_path, DataRow, DataTable, MsgId}; fn main() { log_messages(); @@ -57,7 +57,6 @@ fn main() { fn log_messages() { use re_log_types::{ datagen::{build_frame_nr, build_some_point2d}, - msg_bundle::try_build_msg_bundle1, ArrowMsg, LogMsg, TimeInt, TimePoint, Timeline, }; @@ -107,13 +106,17 @@ fn log_messages() { { let used_bytes_start = live_bytes(); let msg_bundle = Box::new( - try_build_msg_bundle1( - MsgId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - build_some_point2d(1), + DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells1( + MsgId::random(), + entity_path!("points"), + [build_frame_nr(0.into())], + 1, + build_some_point2d(1), + )], ) - .unwrap(), + .into_msg_bundle(), ); let msg_bundle_bytes = live_bytes() - used_bytes_start; let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap())); @@ -129,13 +132,17 @@ fn log_messages() { { let used_bytes_start = live_bytes(); let msg_bundle = Box::new( - try_build_msg_bundle1( - MsgId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - build_some_point2d(NUM_POINTS), + DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells1( + MsgId::random(), + entity_path!("points"), + [build_frame_nr(0.into())], + NUM_POINTS as _, + build_some_point2d(NUM_POINTS), + )], ) - .unwrap(), + .into_msg_bundle(), ); let msg_bundle_bytes = live_bytes() - used_bytes_start; let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap())); diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index c1552ebb99d90..64ea1ce9aa46e 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -4,9 +4,9 @@ use re_arrow_store::{DataStoreConfig, GarbageCollectionTarget, TimeInt}; use re_log_types::{ component_types::InstanceKey, external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, msg_bundle::MsgBundle, - ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, EntityPath, - EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint, - Timeline, + ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, + EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, + TimePoint, Timeline, }; use crate::{Error, TimesPerTimeline}; @@ -78,42 +78,52 @@ impl EntityDb { fn try_add_arrow_data_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> { let msg_bundle = MsgBundle::try_from(msg).map_err(Error::MsgBundleError)?; + let table = DataTable::from_msg_bundle(msg_bundle); - for (&timeline, &time_int) in msg_bundle.time_point.iter() { + // TODO(cmc): batching + for row in table.as_rows() { + // TODO: should that fail? + self.try_add_data_row(&row)?; + } + + Ok(()) + } + + fn try_add_data_row(&mut self, row: &DataRow) -> Result<(), Error> { + for (&timeline, &time_int) in row.timepoint().iter() { self.times_per_timeline.insert(timeline, time_int); } - self.register_entity_path(&msg_bundle.entity_path); + self.register_entity_path(&row.entity_path); - for component in &msg_bundle.cells { - let component_path = - ComponentPath::new(msg_bundle.entity_path.clone(), component.name()); - if component.name() == MsgId::name() { + // TODO(cmc): batching + for cell in row.cells() { + let component_path = ComponentPath::new(row.entity_path().clone(), cell.component()); + if cell.component() == MsgId::name() { continue; } - let pending_clears = self - .tree - .add_data_msg(&msg_bundle.time_point, &component_path); + let pending_clears = self.tree.add_data_msg(row.timepoint(), &component_path); for (msg_id, time_point) in pending_clears { // Create and insert an empty component into the arrow store // TODO(jleibs): Faster empty-array creation - let cell = - DataCell::from_arrow_empty(component.name(), component.datatype().clone()); - let msg_bundle = MsgBundle::new( + let cell = DataCell::from_arrow_empty(cell.component(), cell.datatype().clone()); + + let row = DataRow::from_cells1( msg_id, - msg_bundle.entity_path.clone(), + row.entity_path.clone(), time_point.clone(), - vec![cell], + cell.len(), + cell, ); - self.data_store.insert(&msg_bundle).ok(); + self.data_store.insert_row(&row).ok(); // Also update the tree with the clear-event self.tree.add_data_msg(&time_point, &component_path); } } - self.data_store.insert(&msg_bundle).map_err(Into::into) + self.data_store.insert_row(row).map_err(Into::into) } fn add_path_op(&mut self, msg_id: MsgId, time_point: &TimePoint, path_op: &PathOp) { @@ -128,13 +138,14 @@ impl EntityDb { // TODO(jleibs): Faster empty-array creation let cell = DataCell::from_arrow_empty(component_path.component_name, data_type.clone()); - let msg_bundle = MsgBundle::new( + let row = DataRow::from_cells1( msg_id, component_path.entity_path.clone(), time_point.clone(), - vec![cell], + cell.len(), + cell, ); - self.data_store.insert(&msg_bundle).ok(); + self.data_store.insert_row(&row).ok(); // Also update the tree with the clear-event self.tree.add_data_msg(time_point, &component_path); } diff --git a/crates/re_log_types/benches/msg_encode_benchmark.rs b/crates/re_log_types/benches/msg_encode_benchmark.rs index 01812f74ddc2b..849e82399d2ca 100644 --- a/crates/re_log_types/benches/msg_encode_benchmark.rs +++ b/crates/re_log_types/benches/msg_encode_benchmark.rs @@ -7,8 +7,8 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use re_log_types::{ datagen::{build_frame_nr, build_some_colors, build_some_point2d}, entity_path, - msg_bundle::{try_build_msg_bundle2, MsgBundle}, - ArrowMsg, Index, LogMsg, MsgId, + msg_bundle::MsgBundle, + ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, }; use criterion::{criterion_group, criterion_main, Criterion}; @@ -63,13 +63,17 @@ fn mono_points_arrow(c: &mut Criterion) { fn generate_message_bundles() -> Vec { (0..NUM_POINTS) .map(|i| { - try_build_msg_bundle2( + DataTable::from_rows( MsgId::ZERO, - entity_path!("points", Index::Sequence(i as _)), - [build_frame_nr(0.into())], - (build_some_point2d(1), build_some_colors(1)), + [DataRow::from_cells2( + MsgId::ZERO, + entity_path!("points", Index::Sequence(i as _)), + [build_frame_nr(0.into())], + 1, + (build_some_point2d(1), build_some_colors(1)), + )], ) - .unwrap() + .into_msg_bundle() }) .collect() } @@ -115,16 +119,22 @@ fn mono_points_arrow(c: &mut Criterion) { fn batch_points_arrow(c: &mut Criterion) { fn generate_message_bundles() -> Vec { - vec![try_build_msg_bundle2( - MsgId::ZERO, - entity_path!("points"), - [build_frame_nr(0.into())], - ( - build_some_point2d(NUM_POINTS), - build_some_colors(NUM_POINTS), - ), - ) - .unwrap()] + vec![ + DataTable::from_rows( + MsgId::ZERO, + [DataRow::from_cells2( + MsgId::ZERO, + entity_path!("points"), + [build_frame_nr(0.into())], + 1, + ( + build_some_point2d(NUM_POINTS), + build_some_colors(NUM_POINTS), + ), + )], + ) + .into_msg_bundle(), // + ] } { diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index b4400bb09d3ec..83aa697e4cabc 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -117,7 +117,7 @@ mod tests { use super::{ArrowMsg, Chunk, MsgId, Schema}; use crate::{ datagen::{build_frame_nr, build_some_point2d, build_some_rects}, - msg_bundle::try_build_msg_bundle2, + DataRow, }; #[test] @@ -162,16 +162,20 @@ mod tests { #[test] fn test_roundtrip_payload() { - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::ZERO, "world/rects", [build_frame_nr(0.into())], + 1, (build_some_point2d(1), build_some_rects(1)), - ) - .unwrap(); + ); + + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); // TODO(cmc): that's not a full roundtrip though - let msg_in: ArrowMsg = bundle.try_into().unwrap(); + let msg_in: ArrowMsg = msg_bundle.try_into().unwrap(); let buf = rmp_serde::to_vec(&msg_in).unwrap(); let msg_out: ArrowMsg = rmp_serde::from_slice(&buf).unwrap(); assert_eq!(msg_in, msg_out); diff --git a/crates/re_log_types/src/component_types/instance_key.rs b/crates/re_log_types/src/component_types/instance_key.rs index b2a789f0e4ee8..869df1936fa4f 100644 --- a/crates/re_log_types/src/component_types/instance_key.rs +++ b/crates/re_log_types/src/component_types/instance_key.rs @@ -70,6 +70,12 @@ impl std::fmt::Display for InstanceKey { } } +impl From for InstanceKey { + fn from(value: u64) -> Self { + Self(value) + } +} + impl Component for InstanceKey { #[inline] fn name() -> crate::ComponentName { diff --git a/crates/re_log_types/src/component_types/tensor.rs b/crates/re_log_types/src/component_types/tensor.rs index eef50e912ca17..ec57d59a1265e 100644 --- a/crates/re_log_types/src/component_types/tensor.rs +++ b/crates/re_log_types/src/component_types/tensor.rs @@ -750,12 +750,12 @@ fn test_arrow() { #[test] fn test_concat_and_slice() { - use crate::msg_bundle::wrap_in_listarray; use arrow2::array::ListArray; use arrow2::compute::concatenate::concatenate; - use arrow2_convert::{deserialize::TryIntoCollection, serialize::TryIntoArrow}; - let tensor1 = vec![Tensor { + use crate::DataCell; + + let tensor1 = Tensor { tensor_id: TensorId::random(), shape: vec![TensorDimension { size: 4, @@ -764,9 +764,9 @@ fn test_concat_and_slice() { data: TensorData::JPEG(vec![1, 2, 3, 4].into()), meaning: TensorDataMeaning::Unknown, meter: Some(1000.0), - }]; + }; - let tensor2 = vec![Tensor { + let tensor2 = Tensor { tensor_id: TensorId::random(), shape: vec![TensorDimension { size: 4, @@ -775,12 +775,10 @@ fn test_concat_and_slice() { data: TensorData::JPEG(vec![5, 6, 7, 8].into()), meaning: TensorDataMeaning::Unknown, meter: None, - }]; + }; - let array1: Box = tensor1.iter().try_into_arrow().unwrap(); - let list1 = wrap_in_listarray(array1).boxed(); - let array2: Box = tensor2.iter().try_into_arrow().unwrap(); - let list2 = wrap_in_listarray(array2).boxed(); + let list1 = DataCell::from_native(&[tensor1.clone()]).as_arrow_monolist(); + let list2 = DataCell::from_native(&[tensor2.clone()]).as_arrow_monolist(); let pre_concat = list1 .as_any() @@ -788,8 +786,10 @@ fn test_concat_and_slice() { .unwrap() .value(0); - let tensor_out: Vec = TryIntoCollection::try_into_collection(pre_concat).unwrap(); - + let tensor_out = DataCell::from_arrow(Tensor::name(), pre_concat) + .as_native::() + .next() + .unwrap(); assert_eq!(tensor1, tensor_out); let concat = concatenate(&[list1.as_ref(), list2.as_ref()]).unwrap(); @@ -800,7 +800,10 @@ fn test_concat_and_slice() { .unwrap() .value(1); - let tensor_out: Vec = TryIntoCollection::try_into_collection(slice).unwrap(); + let tensor_out = DataCell::from_arrow(Tensor::name(), slice) + .as_native::() + .next() + .unwrap(); - assert_eq!(tensor2[0], tensor_out[0]); + assert_eq!(tensor2, tensor_out); } diff --git a/crates/re_log_types/src/data_cell.rs b/crates/re_log_types/src/data_cell.rs index 0101cbd642ac4..aec6cdcbff0eb 100644 --- a/crates/re_log_types/src/data_cell.rs +++ b/crates/re_log_types/src/data_cell.rs @@ -1,9 +1,12 @@ -use arrow2_convert::deserialize::arrow_array_deserialize_iterator; +use itertools::Itertools as _; -use crate::{ - external::arrow2_convert::serialize::TryIntoArrow, msg_bundle::wrap_in_listarray, Component, - ComponentName, DeserializableComponent, SerializableComponent, -}; +use crate::{Component, ComponentName, DeserializableComponent, SerializableComponent}; + +// TODO: any profile scopes needed? +// TODO: overall this needs more aggressive checks to make sure arrays are always leaves + +// TODO: concatenating cells etc should be provided here as high-level operations, dont use raw +// arrow arrays // --- @@ -20,21 +23,55 @@ pub type DataCellResult = ::std::result::Result; // --- -/// A cell's worth of data, i.e. a uniform list of values for a given component type: `[C, C, ..]`. +/// A cell's worth of data, i.e. a uniform array of values for a given component type. /// /// A `DataCell` can be constructed from either an iterable of native `Component`s or directly -/// from an arrow `StructArray`. +/// from a slice of arrow data. +/// +/// Internally, the data is stored in an erased, contiguous arrow array on the heap. +/// Cloning a `DataCell` is cheap (shallow, ref-counted). +/// +/// # Layout /// -/// Cloning a `DataCell` is cheap (shallow). +/// A cell is an array of component instances: `[C, C, C, ...]`. +/// +/// Consider this example: +/// ```ignore +/// let points: &[Point2D] = &[ +/// [10.0, 10.0].into(), +/// [20.0, 20.0].into(), +/// [30.0, 30.0].into(), +/// ]; +/// let cell = DataCell::from(points); +/// // Or, alternatively: +/// let cell = DataCell::from_component::([[10.0, 10.0], [20.0, 20.0], [30.0, 30.0]]); +/// ``` +/// +/// The cell's datatype is now a `StructArray`: +/// ```ignore +/// Struct([ +/// Field { name: "x", data_type: Float32, is_nullable: false, metadata: {} }, +/// Field { name: "y", data_type: Float32, is_nullable: false, metadata: {} }, +/// ]) +/// ``` +/// +/// This effectively makes up a cell, in the context of a larger table: +/// ```ignore +/// ┌──────────────────────────────────────────────────┐ +/// │ rerun.point2d │ +/// ╞══════════════════════════════════════════════════╡ +/// │ [{x: 10, y: 10}, {x: 20, y: 20}, {x: 30, y: 30}] │ +/// └──────────────────────────────────────────────────┘ +/// ``` #[derive(Debug, Clone)] pub struct DataCell { /// Name of the component type used in this cell. // - // TODO(cmc): We should consider storing this information within the values array itself, + // TODO(cmc): We need to store this extra information within the values array itself, // rather than outside of it. Arrow has the concept of extensions specifically for storing // type metadata, but we have had some issues with it in the past. This is an opportunity to // revisit and improve upon that implementation. - name: ComponentName, + pub(crate) name: ComponentName, /// A uniformly typed list of values for the given component type: `[C, C, C, ...]` /// @@ -43,13 +80,9 @@ pub struct DataCell { /// /// Internally this is always stored as an erased arrow array to avoid bad surprises with /// frequent boxing/unboxing down the line. - // - // TODO(cmc): Today this _has_ to be a StructArray, but that's a probably an issue in the - // making. - values: Box, + pub(crate) values: Box, } -// Constructors impl DataCell { /// Builds a new `DataCell` from a uniform list of native component values. /// @@ -61,6 +94,7 @@ impl DataCell { pub fn try_from_native<'a, C: SerializableComponent>( values: impl IntoIterator, ) -> DataCellResult { + use arrow2_convert::serialize::TryIntoArrow; Ok(Self::from_arrow( C::name(), TryIntoArrow::try_into_arrow(values.into_iter())?, @@ -154,6 +188,7 @@ impl DataCell { // part of the metadata by using an extension. #[inline] pub fn from_arrow_empty(name: ComponentName, datatype: arrow2::datatypes::DataType) -> Self { + // TODO: don't unwrap the error is awful Self::try_from_arrow_empty(name, datatype).unwrap() } @@ -171,23 +206,51 @@ impl DataCell { &*self.values } + /// Returns the contents of the cell as an arrow array (shallow clone) wrapped in a unit-length + /// list-array. + /// + /// Useful when dealing with cells of different lengths in context that don't allow for it. + /// + /// * Before: `[C, C, C, ...]` + /// * After: `ListArray[ [C, C, C, C] ]` + // + // TODO(cmc): this shouldn't be public, need to make private once the store is fixed. + #[doc(hidden)] + #[inline] + pub fn as_arrow_monolist(&self) -> Box { + use arrow2::{array::ListArray, offset::Offsets}; + + let values = self.as_arrow(); + let datatype = self.datatype().clone(); + + let datatype = ListArray::::default_datatype(datatype); + let offsets = Offsets::try_from_lengths(std::iter::once(self.len() as usize)) + .unwrap() + .into(); + let validity = None; + + ListArray::::new(datatype, offsets, values, validity).boxed() + } + /// Returns the contents of the cell as an iterator of native components. /// - /// Fails + // TODO: Fails + // + // TODO(cmc): There shouldn't need to be HRTBs here. #[inline] pub fn as_native(&self) -> impl Iterator + '_ where for<'a> &'a C::ArrayType: IntoIterator, { + use arrow2_convert::deserialize::arrow_array_deserialize_iterator; arrow_array_deserialize_iterator(&*self.values).unwrap() } } -// Getters impl DataCell { /// The name of the component type stored in the cell. #[inline] - pub fn name(&self) -> ComponentName { + pub fn component(&self) -> ComponentName { self.name } @@ -198,9 +261,10 @@ impl DataCell { } /// The length of the cell's array, i.e. how many component instances are in the cell? + // TODO: num_instances? #[inline] - pub fn len(&self) -> usize { - self.values.len() + pub fn len(&self) -> u32 { + self.values.len() as _ } /// See [`Self::len`]. @@ -212,6 +276,8 @@ impl DataCell { // --- +// TODO(cmc): this should be `C: Component`, nothing else. + impl From<&[C]> for DataCell { #[inline] fn from(values: &[C]) -> Self { @@ -235,12 +301,28 @@ impl From<&Vec> for DataCell { // --- +impl DataCell { + /// Builds a cell from an iterable of items that can be turned into a [`Component`]. + /// + /// ⚠ This requires allocating due to quirks in `arrow2-convert`, prefer [`Self::from_native`] + /// when performance matters. + pub fn from_component(values: impl IntoIterator>) -> Self + where + C: SerializableComponent, + { + let values = values.into_iter().map(Into::into).collect_vec(); + Self::from_native(values.iter()) + } +} + +// --- + impl std::fmt::Display for DataCell { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let table = re_format::arrow::format_table( - // wrap in a ListArray so that it looks more cell-like (i.e. single row) - [&*wrap_in_listarray(self.as_arrow()).boxed()], - [self.name()], + // NOTE: wrap in a ListArray so that it looks more cell-like (i.e. single row) + [&*self.as_arrow_monolist()], + [self.component()], ); table.fmt(f) } @@ -256,18 +338,20 @@ fn data_cell() { use crate::component_types::Point2D; - let points = &[ - Point2D::new(10.0, 10.0), - Point2D::new(20.0, 20.0), - Point2D::new(30.0, 30.0), + let points: &[Point2D] = &[ + [10.0, 10.0].into(), + [20.0, 20.0].into(), + [30.0, 30.0].into(), ]; + let _cell = DataCell::from(points); - let cell = DataCell::from_native(points); + // Or, alternatively: + let cell = DataCell::from_component::([[10.0, 10.0], [20.0, 20.0], [30.0, 30.0]]); eprintln!("{:#?}", cell.datatype()); eprintln!("{cell}"); - assert_eq!(Point2D::name(), cell.name()); + assert_eq!(Point2D::name(), cell.component()); assert_eq!(3, cell.len()); assert_eq!(cell.datatype(), &Point2D::data_type()); diff --git a/crates/re_log_types/src/data_row.rs b/crates/re_log_types/src/data_row.rs new file mode 100644 index 0000000000000..9a3368f2d185c --- /dev/null +++ b/crates/re_log_types/src/data_row.rs @@ -0,0 +1,418 @@ +use ahash::HashSetExt; +use itertools::Itertools as _; +use nohash_hasher::IntSet; + +use crate::{Component, ComponentName, DataCell, DataTable, EntityPath, MsgId, TimePoint}; + +// TODO: any profile scopes needed? + +// --- + +#[derive(thiserror::Error, Debug)] +pub enum DataRowError { + #[error("Could not serialize/deserialize component instances to/from Arrow")] + Arrow(#[from] arrow2::error::Error), + + // Needed to handle TryFrom -> T + #[error("Infallible")] + Unreachable(#[from] std::convert::Infallible), +} + +pub type DataRowResult = ::std::result::Result; + +// --- + +// TODO: note that a row is by definition a bunch of independent / non-contiguous memory blocks. + +// TODO: explain that fields being public is only for destructuration, anything else is UB + +/// A row's worth of data, i.e. an event: a list of [`DataCell`]s associated with an auto-generated +/// [`EventId`], a user-specified [`TimePoint`] and [`EntityPath`], and an expected number of +/// instances. +/// +/// A `DataRow` can only be constructed from a collection of [`DataCell`]s. +/// +/// # Layout +/// +/// A row is a collection of cells where each cell must either be empty (a clear), unit-lengthed +/// (a splat) or [`num_instances`] long (standard): `[[C1, C1, C1], [], [C3], [C4, C4, C4], ...]`. +/// +/// Consider this example: +/// ```ignore +/// let num_instances = 2; +/// let points: &[_] = &[ +/// Point2D::new(10.0, 10.0), // +/// Point2D::new(20.0, 20.0), +/// ]; +/// let colors: &[_] = &[ +/// ColorRGBA::from_rgb(128, 128, 128), // +/// ]; +/// let labels: &[Label] = &[]; +/// +/// let cells = [points.into(), colors.into(), labels.into()]; +/// let row = DataRow::from_cells(event_id, timepoint, ent_path, num_instances, cells); +/// ``` +/// +/// A row has no arrow representation nor datatype of its own, as it is merely a connection of +/// independent cells. +/// +/// This effectively makes up a row, in the context of a larger table: +/// ```ignore +/// ┌──────────────────────────────────┬─────────────────┬─────────────┐ +/// │ rerun.point2d ┆ rerun.colorrgba ┆ rerun.label │ +/// │ --- ┆ --- ┆ --- │ +/// │ list[struct[2]] ┆ list[u32] ┆ list[str] │ +/// ╞══════════════════════════════════╪═════════════════╪═════════════╡ +/// │ [{x: 10, y: 10}, {x: 20, y: 20}] ┆ [2155905279] ┆ [] │ +/// └──────────────────────────────────┴─────────────────┴─────────────┘ +/// ``` +#[derive(Debug, Clone)] +pub struct DataRow { + /// Auto-generated [`TUID`], uniquely identifying this event and keeping track of the client's + /// wall-clock. + // TODO(cmc): introduce EventId & BatchId + pub row_id: MsgId, + + /// User-specified [`TimePoint`] for this event. + pub timepoint: TimePoint, + + /// User-specified [`EntityPath`] for this event. + pub entity_path: EntityPath, + + /// The expected number of values (== component instances) in each cell. + /// + /// Each cell must have either: + /// - 0 instance (clear), + /// - 1 instance (splat), + /// - `num_instances` instances (standard). + pub num_instances: u32, + + /// The actual cells (== columns, == components). + pub cells: Vec, +} + +impl DataRow { + /// Builds a new `DataRow` from an iterable of [`DataCell`]s. + /// + /// Fails if: + /// - one or more cell isn't 0, 1 or `num_instances` long, + /// - two or more cells share the same component type. + // + // TODO: panicky version + #[inline] + pub fn from_cells( + row_id: MsgId, + timepoint: impl Into, + entity_path: impl Into, + num_instances: u32, + cells: impl IntoIterator, + ) -> Self { + let cells = cells.into_iter().collect_vec(); + + let mut components = IntSet::with_capacity(cells.len()); + for cell in &cells { + if !components.insert(cell.component()) { + assert!(false, "duped component {}", cell.component()); // TODO + } + } + + let mut this = Self { + row_id, + entity_path: entity_path.into(), + timepoint: timepoint.into(), + num_instances, + cells, + }; + + // TODO(cmc): Since we don't yet support mixing splatted data within instanced rows, + // we need to craft an array of `MsgId`s that matches the length of the other components. + // TODO(cmc): This goes away with batching & al + if !components.contains(&MsgId::name()) { + this.cells.push(DataCell::from_native( + vec![row_id; this.num_instances() as _].iter(), + )); + } + + this + } + + /// Append a cell to an existing row. + /// + /// Returns an error if the cell is not compatible with the row. + // + // TODO: fails if? + // TODO: panicky version + #[inline] + pub fn append_cell(&mut self, cell: DataCell) -> anyhow::Result<()> { + // TODO: check that the row is legal + + self.cells.push(cell); + + Ok(()) + } + + #[inline] + pub fn into_table(self, table_id: MsgId) -> DataTable { + DataTable::from_rows(table_id, [self]) + } +} + +impl DataRow { + #[inline] + pub fn row_id(&self) -> MsgId { + self.row_id + } + + #[inline] + pub fn timepoint(&self) -> &TimePoint { + &self.timepoint + } + + #[inline] + pub fn entity_path(&self) -> &EntityPath { + &self.entity_path + } + + #[inline] + pub fn num_components(&self) -> usize { + self.cells.len() + } + + #[inline] + pub fn components(&self) -> impl ExactSizeIterator + '_ { + self.cells.iter().map(|cell| cell.component()) + } + + #[inline] + pub fn num_instances(&self) -> u32 { + self.num_instances + } + + #[inline] + pub fn cells(&self) -> &[DataCell] { + &self.cells + } + + #[inline] + pub fn into_cells(self) -> Vec { + self.cells + } + + /// Returns the index of `component` in the row, if it exists. + /// + /// This is `O(n)`. + #[inline] + pub fn find_component(&self, component: &ComponentName) -> Option { + self.cells + .iter() + .map(|cell| cell.component()) + .position(|name| name == *component) + } +} + +// --- + +impl DataRow { + pub fn from_cells1( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: C0, + ) -> DataRow + where + C0: Into, + { + Self::from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [into_cells.into()], + ) + } + + pub fn try_from_cells1( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: C0, + ) -> DataRowResult + where + C0: TryInto, + DataRowError: From<>::Error>, + { + Ok(Self::from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [into_cells.try_into()?], + )) + } + + pub fn from_cells2( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: (C0, C1), + ) -> DataRow + where + C0: Into, + C1: Into, + { + Self::from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [ + into_cells.0.into(), // + into_cells.1.into(), // + ], + ) + } + + pub fn try_from_cells2( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: (C0, C1), + ) -> DataRowResult + where + C0: TryInto, + C1: TryInto, + DataRowError: From<>::Error>, + DataRowError: From<>::Error>, + { + Ok(Self::from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [ + into_cells.0.try_into()?, // + into_cells.1.try_into()?, // + ], + )) + } + + pub fn from_cells3( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: (C0, C1, C2), + ) -> DataRow + where + C0: Into, + C1: Into, + C2: Into, + { + Self::from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [ + into_cells.0.into(), // + into_cells.1.into(), // + into_cells.2.into(), // + ], + ) + } + + pub fn try_from_cells3( + row_id: MsgId, + entity_path: impl Into, + timepoint: impl Into, + num_instances: u32, + into_cells: (C0, C1, C2), + ) -> DataRowResult + where + C0: TryInto, + C1: TryInto, + C2: TryInto, + DataRowError: From<>::Error>, + DataRowError: From<>::Error>, + DataRowError: From<>::Error>, + { + Ok(Self::from_cells( + row_id, + timepoint.into(), + entity_path.into(), + num_instances, + [ + into_cells.0.try_into()?, // + into_cells.1.try_into()?, // + into_cells.2.try_into()?, // + ], + )) + } +} + +// --- + +impl std::fmt::Display for DataRow { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // TODO + // let chunk = arrow2::chunk::Chunk::new( + // self.cells + // .iter() + // .map(|cell| wrap_in_listarray(cell.as_arrow()).boxed()) + // .collect(), + // ); + // eprintln!("{chunk:?}"); + // arrow2::chunk::Chunk::new(self.cells.iter().map(|cell| cell.as_arrow()).collect()); + + // TODO: dump metadata too + let table = re_format::arrow::format_table( + self.cells.iter().map(|cell| cell.as_arrow_monolist()), + self.cells.iter().map(|cell| cell.component()), + ); + table.fmt(f) + } +} + +// --- + +// TODO: +// - append +// - error paths + +#[test] +fn data_row() { + use crate::{ + component_types::{ColorRGBA, Label, Point2D}, + Timeline, + }; + + let row_id = MsgId::ZERO; + let num_instances = 2; + + let points: &[Point2D] = &[ + [10.0, 10.0].into(), // + [20.0, 20.0].into(), + ]; + let colors: &[_] = &[ + ColorRGBA::from_rgb(128, 128, 128), // + ]; + let labels: &[Label] = &[]; + + let row = DataRow::from_cells3( + row_id, + "a/b/c", + [ + (Timeline::new_sequence("frame_nr"), 42.into()), // + (Timeline::new_sequence("pouet"), 666.into()), // + ], + num_instances, + (points, colors, labels), + ); + eprintln!("{row}"); + + // TODO: checks +} diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs new file mode 100644 index 0000000000000..5f56211b3c07b --- /dev/null +++ b/crates/re_log_types/src/data_table.rs @@ -0,0 +1,296 @@ +use itertools::Itertools as _; +use nohash_hasher::{IntMap, IntSet}; + +use crate::{ComponentName, DataCell, DataRow, EntityPath, MsgId, TimePoint}; + +// TODO: any profile scopes needed? +// TODO: let's turn this into a msgbundle, and then another PR handles table and removes msgbundle + +// --- + +#[derive(thiserror::Error, Debug)] +pub enum DataTableError { + #[error("Could not serialize/deserialize component instances to/from Arrow")] + Arrow(#[from] arrow2::error::Error), +} + +pub type DataTableResult = ::std::result::Result; + +// --- + +// TODO: explain that while not contiguous... it in fact is + +// TODO +#[derive(Debug, Clone)] +pub struct DataTable { + /// Auto-generated [`TUID`], uniquely identifying this batch of data and keeping track of the + /// client's wall-clock. + // TODO(cmc): introduce EventId & BatchId + // TODO(cmc): use once batching lands + pub table_id: MsgId, + + /// The entire column of [`EventId`]s. + // TODO(cmc): introduce EventId & BatchId + pub row_id: Vec, + + /// The entire column of [`TimePoint`]s. + pub timepoint: Vec, + + /// The entire column of [`EntityPath`]s. + pub entity_path: Vec, + + /// The entire column of `num_instances`. + pub num_instances: Vec, + + /// All the rows for all the component columns. + /// + /// The cells are optional since not all rows will have data for every single component + /// (i.e. the table is sparse). + pub table: IntMap>>, +} + +impl DataTable { + /// Builds a new `DataTable` from an iterable of [`DataRow`]s. + // + // TODO: fails if? + // TODO: panicky version + pub fn from_rows(table_id: MsgId, rows: impl IntoIterator) -> Self { + let rows = rows.into_iter(); + + // Explode all rows into columns, and keep track of which components are involved. + let mut components = IntSet::default(); + let (row_id, timepoint, entity_path, num_instances, rows): ( + Vec<_>, + Vec<_>, + Vec<_>, + Vec<_>, + Vec<_>, + ) = rows + .map(|row| { + components.extend(row.components()); + let DataRow { + row_id, + timepoint, + entity_path, + num_instances, + cells, + } = row; + (row_id, timepoint, entity_path, num_instances, cells) + }) + .multiunzip(); + + // Pre-allocate all columns (one per component). + let mut table = IntMap::default(); + for component in components { + table.insert(component, vec![None; rows.len()]); + } + + // Fill all columns (where possible: data is likely sparse). + for (i, row) in rows.into_iter().enumerate() { + for cell in row { + let component = cell.component(); + // NOTE: unwrap cannot fail, all arrays pre-allocated above + table.get_mut(&component).unwrap()[i] = Some(cell); + } + } + + Self { + table_id, + row_id, + timepoint, + entity_path, + num_instances, + table, + } + } + + /// Append a row to an existing table. + /// + /// Returns an error if the row is not compatible with the table. + // + // TODO: fails if? + // TODO: panicky version + pub fn append_row(&mut self, row: DataRow) { + todo!() + } +} +// TODO(cmc): Some convenient `From` implementations etc + +// TODO: as_chunk? + +impl DataTable { + #[inline] + pub fn num_rows(&self) -> u32 { + self.row_id.len() as _ + } + + // TODO(cmc): implement + pub fn as_arrow() -> ( + arrow2::datatypes::Schema, + arrow2::chunk::Chunk>, + ) { + todo!() + } +} + +// --- + +// TODO: temporary transition period stuff + +use crate::msg_bundle::MsgBundle; + +impl DataTable { + pub fn as_rows(&self) -> impl ExactSizeIterator + '_ { + let num_rows = self.num_rows() as usize; + + let Self { + table_id: _, + row_id, + timepoint, + entity_path, + num_instances, + table, + } = self; + + (0..num_rows).map(move |i| { + let cells = table + .values() + .filter_map(|rows| rows[i].clone() /* shallow */); + + DataRow::from_cells( + row_id[i], + // TODO: get rid of those + timepoint[i].clone(), + entity_path[i].clone(), + num_instances[i], + cells, + ) + }) + } + + pub fn from_msg_bundle(msg_bundle: MsgBundle) -> Self { + let MsgBundle { + msg_id, + entity_path, + time_point, + cells, + } = msg_bundle; + + Self::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells( + msg_id, + time_point, + entity_path, + cells.first().map_or(0, |cell| cell.len()), + cells, + )], + ) + } + + pub fn into_msg_bundle(self) -> MsgBundle { + let mut rows = self.as_rows(); + assert!(rows.len() == 1, "must have 1 row, got {}", rows.len()); // TODO(cmc): batching + let row = rows.next().unwrap(); + + let DataRow { + row_id, + timepoint, + entity_path, + num_instances: _, + cells, + } = row; + + let table_id = row_id; // ! + + MsgBundle::new(table_id, entity_path, timepoint, cells) + } +} + +// --- + +impl std::fmt::Display for DataTable { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for row in self.as_rows() { + writeln!(f, "{row}")?; + } + Ok(()) + } +} + +// --- + +// TODO: +// - append +// - error paths + +#[test] +fn data_table() { + use crate::{ + component_types::{ColorRGBA, Label, Point2D}, + Timeline, + }; + + let table_id = MsgId::random(); + + let timepoint = |frame_nr: i64, pouet: i64| { + TimePoint::from([ + ( + Timeline::new("frame_nr", crate::TimeType::Sequence), + frame_nr.into(), + ), // + ( + Timeline::new("pouet", crate::TimeType::Sequence), + pouet.into(), + ), // + ]) + }; + + let row1 = { + let num_instances = 2; + let points: &[_] = &[ + Point2D::new(10.0, 10.0), // + Point2D::new(20.0, 20.0), + ]; + let colors: &[_] = &[ + ColorRGBA::from_rgb(128, 128, 128), // + ]; + let labels: &[Label] = &[]; + + DataRow::from_cells3( + MsgId::random(), + "a", + timepoint(1, 1), + num_instances, + (points, colors, labels), + ) + }; + + let row2 = { + let num_instances = 0; + let colors: &[ColorRGBA] = &[]; + + DataRow::from_cells1(MsgId::random(), "b", timepoint(1, 2), num_instances, colors) + }; + + let row3 = { + let num_instances = 1; + let colors: &[_] = &[ + ColorRGBA::from_rgb(128, 128, 128), // + ]; + let labels: &[_] = &[Label("hey".into())]; + + DataRow::from_cells2( + MsgId::random(), + "c", + timepoint(2, 1), + num_instances, + (colors, labels), + ) + }; + + let table = DataTable::from_rows(table_id, [row1, row2, row3]); + eprintln!("{table}"); + + // TODO: checks +} diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index d0b20f2391973..0b6f1f83e3b67 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -15,6 +15,8 @@ mod component; pub mod component_types; mod data; mod data_cell; +mod data_row; +mod data_table; pub mod hash; mod index; pub mod msg_bundle; @@ -46,6 +48,8 @@ pub use self::component_types::ViewCoordinates; pub use self::component_types::{EncodedMesh3D, Mesh3D, MeshFormat, MeshId, RawMesh3D}; pub use self::data::*; pub use self::data_cell::{DataCell, DataCellError, DataCellResult}; +pub use self::data_row::{DataRow, DataRowError, DataRowResult}; +pub use self::data_table::{DataTable, DataTableError, DataTableResult}; pub use self::index::*; pub use self::path::*; pub use self::time::{Duration, Time}; diff --git a/crates/re_log_types/src/msg_bundle.rs b/crates/re_log_types/src/msg_bundle.rs index 673891fde3642..f40ac6e6aa03e 100644 --- a/crates/re_log_types/src/msg_bundle.rs +++ b/crates/re_log_types/src/msg_bundle.rs @@ -24,7 +24,6 @@ use arrow2::{ array::{Array, ListArray, StructArray}, chunk::Chunk, datatypes::{DataType, Field, Schema}, - offset::Offsets, }; use arrow2_convert::{field::ArrowField, serialize::TryIntoArrow}; @@ -35,6 +34,7 @@ use crate::{ // --- +// TODO: can probably make that one pub(crate) already /// The errors that can occur when trying to convert between Arrow and `MessageBundle` types #[derive(thiserror::Error, Debug)] pub enum MsgBundleError { @@ -97,26 +97,18 @@ impl MsgBundle { /// /// The `MsgId` will automatically be appended as a component to the given `bundles`, allowing /// the backend to keep track of the origin of any row of data. - pub fn new( + pub(crate) fn new( msg_id: MsgId, entity_path: EntityPath, time_point: TimePoint, - components: Vec, + cells: Vec, ) -> Self { - let mut this = Self { + Self { msg_id, entity_path, time_point, - cells: components, - }; - - // TODO(cmc): Since we don't yet support mixing splatted data within instanced rows, - // we need to craft an array of `MsgId`s that matches the length of the other components. - this.cells.push(DataCell::from_native( - vec![msg_id; this.num_instances()].iter(), - )); - - this + cells, + } } /// Returns the number of component collections in this bundle, i.e. the length of the bundle @@ -133,7 +125,7 @@ impl MsgBundle { /// have the same number of instances, we simply pick the value for the first component /// collection. #[inline] - pub fn num_instances(&self) -> usize { + pub fn num_instances(&self) -> u32 { self.cells.first().map_or(0, |cell| cell.len()) } @@ -144,7 +136,7 @@ impl MsgBundle { pub fn find_component(&self, component: &ComponentName) -> Option { self.cells .iter() - .map(|cell| cell.name()) + .map(|cell| cell.component()) .position(|name| name == *component) } } @@ -152,7 +144,7 @@ impl MsgBundle { impl std::fmt::Display for MsgBundle { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let values = self.cells.iter().map(|cell| cell.as_arrow()); - let names = self.cells.iter().map(|cell| cell.name().as_str()); + let names = self.cells.iter().map(|cell| cell.component().as_str()); let table = re_format::arrow::format_table(values, names); f.write_fmt(format_args!( "MsgBundle '{}' @ {:?}:\n{table}", @@ -168,9 +160,9 @@ fn pack_components(cells: impl Iterator) -> (Schema, StructArra .map(|cell| { // NOTE: wrap in a ListArray to emulate the presence of rows, this'll go away with // batching. - let data = wrap_in_listarray(cell.as_arrow()).to_boxed(); + let data = cell.as_arrow_monolist(); ( - Field::new(cell.name().as_str(), data.data_type().clone(), false), + Field::new(cell.component().as_str(), data.data_type().clone(), false), data, ) }) @@ -314,131 +306,3 @@ fn extract_components(schema: &Schema, msg: &Chunk>) -> Result) -> ListArray { - let datatype = ListArray::::default_datatype(field_array.data_type().clone()); - let offsets = Offsets::try_from_lengths(std::iter::once(field_array.len())) - .unwrap() - .into(); - let values = field_array; - let validity = None; - ListArray::::new(datatype, offsets, values, validity) -} - -/// Helper to build a `MessageBundle` from 1 component -pub fn try_build_msg_bundle1( - msg_id: MsgId, - into_entity_path: O, - into_time_point: T, - into_cells: C0, -) -> Result -where - O: Into, - T: Into, - C0: TryInto, - MsgBundleError: From<>::Error>, -{ - Ok(MsgBundle::new( - msg_id, - into_entity_path.into(), - into_time_point.into(), - vec![into_cells.try_into()?], - )) -} - -/// Helper to build a `MessageBundle` from 2 components -pub fn try_build_msg_bundle2( - msg_id: MsgId, - into_entity_path: O, - into_time_point: T, - into_cells: (C0, C1), -) -> Result -where - O: Into, - T: Into, - C0: TryInto, - C1: TryInto, - MsgBundleError: From<>::Error>, - MsgBundleError: From<>::Error>, -{ - Ok(MsgBundle::new( - msg_id, - into_entity_path.into(), - into_time_point.into(), - vec![into_cells.0.try_into()?, into_cells.1.try_into()?], - )) -} - -/// Helper to build a `MessageBundle` from 3 components -pub fn try_build_msg_bundle3( - msg_id: MsgId, - into_entity_path: O, - into_time_point: T, - into_cells: (C0, C1, C2), -) -> Result -where - O: Into, - T: Into, - C0: TryInto, - C1: TryInto, - C2: TryInto, - MsgBundleError: From<>::Error>, - MsgBundleError: From<>::Error>, - MsgBundleError: From<>::Error>, -{ - Ok(MsgBundle::new( - msg_id, - into_entity_path.into(), - into_time_point.into(), - vec![ - into_cells.0.try_into()?, - into_cells.1.try_into()?, - into_cells.2.try_into()?, - ], - )) -} - -// ---------------------------------------------------------------------------- - -#[test] -fn msg_bundle() { - use crate::{ - component_types::{ColorRGBA, Point2D}, - Timeline, - }; - - let points = &[ - Point2D::new(10.0, 10.0), // - Point2D::new(20.0, 20.0), - Point2D::new(30.0, 30.0), - ]; - let color = &[ - ColorRGBA::from_rgb(128, 128, 128), // - ]; - - let msg_id = MsgId::random(); - let ent_path = EntityPath::from("a/b/c"); - let timepoint = TimePoint::from([ - ( - Timeline::new("frame_nr", crate::TimeType::Sequence), - 42.into(), - ), // - ( - Timeline::new("pouet", crate::TimeType::Sequence), - 666.into(), - ), // - ]); - - let cells = vec![DataCell::from_native(points), DataCell::from_native(color)]; - - let msg_bundle_before = MsgBundle::new(msg_id, ent_path, timepoint, cells); - let arrow_msg: ArrowMsg = msg_bundle_before.try_into().unwrap(); - let msg_bundle_after: MsgBundle = (&arrow_msg).try_into().unwrap(); - - // TODO(cmc): some more serious roundtrip tests once we introduce DataRow - - _ = msg_bundle_after; -} diff --git a/crates/re_query/benches/query_benchmark.rs b/crates/re_query/benches/query_benchmark.rs index 7ec74cddb8197..d65c2947cbeba 100644 --- a/crates/re_query/benches/query_benchmark.rs +++ b/crates/re_query/benches/query_benchmark.rs @@ -8,9 +8,7 @@ use re_arrow_store::{DataStore, LatestAtQuery}; use re_log_types::{ component_types::{ColorRGBA, InstanceKey, Point2D, Vec3D}, datagen::{build_frame_nr, build_some_colors, build_some_point2d, build_some_vec3d}, - entity_path, - msg_bundle::{try_build_msg_bundle1, try_build_msg_bundle2, MsgBundle}, - Component, EntityPath, Index, MsgId, TimeType, Timeline, + entity_path, Component, DataRow, EntityPath, Index, MsgId, TimeType, Timeline, }; use re_query::query_entity_with_primary; @@ -45,7 +43,7 @@ fn mono_points(c: &mut Criterion) { let paths = (0..NUM_POINTS) .map(move |point_idx| entity_path!("points", Index::Sequence(point_idx as _))) .collect_vec(); - let msgs = build_points_messages(&paths, 1); + let msgs = build_points_rows(&paths, 1); { let mut group = c.benchmark_group("arrow_mono_points"); @@ -55,14 +53,14 @@ fn mono_points(c: &mut Criterion) { (NUM_POINTS * NUM_FRAMES_POINTS) as _, )); group.bench_function("insert", |b| { - b.iter(|| insert_messages(msgs.iter())); + b.iter(|| insert_rows(msgs.iter())); }); } { let mut group = c.benchmark_group("arrow_mono_points"); group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); - let mut store = insert_messages(msgs.iter()); + let mut store = insert_rows(msgs.iter()); group.bench_function("query", |b| { b.iter(|| query_and_visit_points(&mut store, &paths)); }); @@ -72,7 +70,7 @@ fn mono_points(c: &mut Criterion) { fn batch_points(c: &mut Criterion) { // Batch points are logged together at a single path let paths = [EntityPath::from("points")]; - let msgs = build_points_messages(&paths, NUM_POINTS as _); + let msgs = build_points_rows(&paths, NUM_POINTS as _); { let mut group = c.benchmark_group("arrow_batch_points"); @@ -80,14 +78,14 @@ fn batch_points(c: &mut Criterion) { (NUM_POINTS * NUM_FRAMES_POINTS) as _, )); group.bench_function("insert", |b| { - b.iter(|| insert_messages(msgs.iter())); + b.iter(|| insert_rows(msgs.iter())); }); } { let mut group = c.benchmark_group("arrow_batch_points"); group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); - let mut store = insert_messages(msgs.iter()); + let mut store = insert_rows(msgs.iter()); group.bench_function("query", |b| { b.iter(|| query_and_visit_points(&mut store, &paths)); }); @@ -97,7 +95,7 @@ fn batch_points(c: &mut Criterion) { fn batch_vecs(c: &mut Criterion) { // Batch points are logged together at a single path let paths = [EntityPath::from("vec")]; - let msgs = build_vecs_messages(&paths, NUM_VECS as _); + let msgs = build_vecs_rows(&paths, NUM_VECS as _); { let mut group = c.benchmark_group("arrow_batch_vecs"); @@ -105,14 +103,14 @@ fn batch_vecs(c: &mut Criterion) { (NUM_VECS * NUM_FRAMES_VECS) as _, )); group.bench_function("insert", |b| { - b.iter(|| insert_messages(msgs.iter())); + b.iter(|| insert_rows(msgs.iter())); }); } { let mut group = c.benchmark_group("arrow_batch_vecs"); group.throughput(criterion::Throughput::Elements(NUM_VECS as _)); - let mut store = insert_messages(msgs.iter()); + let mut store = insert_rows(msgs.iter()); group.bench_function("query", |b| { b.iter(|| query_and_visit_vecs(&mut store, &paths)); }); @@ -121,41 +119,41 @@ fn batch_vecs(c: &mut Criterion) { // --- Helpers --- -fn build_points_messages(paths: &[EntityPath], pts: usize) -> Vec { +fn build_points_rows(paths: &[EntityPath], pts: usize) -> Vec { (0..NUM_FRAMES_POINTS) .flat_map(move |frame_idx| { paths.iter().map(move |path| { - try_build_msg_bundle2( + DataRow::from_cells2( MsgId::ZERO, path.clone(), [build_frame_nr((frame_idx as i64).into())], + pts as _, (build_some_point2d(pts), build_some_colors(pts)), ) - .unwrap() }) }) .collect() } -fn build_vecs_messages(paths: &[EntityPath], pts: usize) -> Vec { +fn build_vecs_rows(paths: &[EntityPath], pts: usize) -> Vec { (0..NUM_FRAMES_VECS) .flat_map(move |frame_idx| { paths.iter().map(move |path| { - try_build_msg_bundle1( + DataRow::from_cells1( MsgId::ZERO, path.clone(), [build_frame_nr((frame_idx as i64).into())], + pts as _, build_some_vec3d(pts), ) - .unwrap() }) }) .collect() } -fn insert_messages<'a>(msgs: impl Iterator) -> DataStore { +fn insert_rows<'a>(msgs: impl Iterator) -> DataStore { let mut store = DataStore::new(InstanceKey::name(), Default::default()); - msgs.for_each(|msg_bundle| store.insert(msg_bundle).unwrap()); + msgs.for_each(|row| store.insert_row(row).unwrap()); store } diff --git a/crates/re_query/examples/range.rs b/crates/re_query/examples/range.rs index 365c32b8da0c7..47251d85c75c8 100644 --- a/crates/re_query/examples/range.rs +++ b/crates/re_query/examples/range.rs @@ -8,8 +8,7 @@ use re_arrow_store::{DataStore, RangeQuery, TimeRange}; use re_log_types::{ component_types::{InstanceKey, Point2D, Rect2D}, datagen::{build_frame_nr, build_some_point2d, build_some_rects}, - msg_bundle::try_build_msg_bundle1, - Component as _, EntityPath, MsgId, TimeType, + Component as _, DataRow, EntityPath, MsgId, TimeType, }; use re_query::range_entity_with_primary; @@ -24,28 +23,28 @@ fn main() { let frame4 = [build_frame_nr(4.into())]; let rects = build_some_rects(2); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame1, &rects).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame1, 2, &rects); + store.insert_row(&row).unwrap(); let points = build_some_point2d(2); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame2, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame2, 2, &points); + store.insert_row(&row).unwrap(); let points = build_some_point2d(4); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame3, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame3, 4, &points); + store.insert_row(&row).unwrap(); let rects = build_some_rects(3); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &rects).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame4, 3, &rects); + store.insert_row(&row).unwrap(); let points = build_some_point2d(3); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame4, 3, &points); + store.insert_row(&row).unwrap(); let rects = build_some_rects(3); - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &rects).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), frame4, 3, &rects); + store.insert_row(&row).unwrap(); let query = RangeQuery::new(frame2[0].0, TimeRange::new(frame2[0].1, frame4[0].1)); diff --git a/crates/re_query/src/query.rs b/crates/re_query/src/query.rs index 0ae13f744a37a..f52130654907a 100644 --- a/crates/re_query/src/query.rs +++ b/crates/re_query/src/query.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use re_arrow_store::{DataStore, LatestAtQuery}; -use re_log_types::{component_types::InstanceKey, Component, ComponentName, EntityPath}; +use re_log_types::{component_types::InstanceKey, Component, ComponentName, DataRow, EntityPath}; use crate::{ComponentWithInstances, EntityView, QueryError}; @@ -148,7 +148,6 @@ pub fn __populate_example_store() -> DataStore { use re_log_types::{ component_types::{ColorRGBA, Point2D}, datagen::build_frame_nr, - msg_bundle::try_build_msg_bundle2, MsgId, }; @@ -160,15 +159,26 @@ pub fn __populate_example_store() -> DataStore { let instances = vec![InstanceKey(42), InstanceKey(96)]; let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = - try_build_msg_bundle2(MsgId::ZERO, ent_path, timepoint, (&instances, &points)).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells2( + MsgId::ZERO, + ent_path, + timepoint, + instances.len() as _, + (&instances, &points), + ); + store.insert_row(&row).unwrap(); let instances = vec![InstanceKey(96)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = - try_build_msg_bundle2(MsgId::ZERO, ent_path, timepoint, (instances, colors)).unwrap(); - store.insert(&bundle).unwrap(); + + let row = DataRow::from_cells2( + MsgId::ZERO, + ent_path, + timepoint, + instances.len() as _, + (instances, colors), + ); + store.insert_row(&row).unwrap(); store } diff --git a/crates/re_query/tests/query_tests.rs b/crates/re_query/tests/query_tests.rs index be8c1908d7a83..a251941bf54ac 100644 --- a/crates/re_query/tests/query_tests.rs +++ b/crates/re_query/tests/query_tests.rs @@ -5,9 +5,7 @@ use re_log_types::{ component_types::InstanceKey, component_types::{ColorRGBA, Point2D}, datagen::build_frame_nr, - msg_bundle::try_build_msg_bundle1, - msg_bundle::try_build_msg_bundle2, - Component, MsgId, + Component, DataRow, MsgId, }; use re_query::query_entity_with_primary; @@ -20,20 +18,20 @@ fn simple_query() { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, &points); + store.insert_row(&row).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path, timepoint, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -91,15 +89,14 @@ fn timeless_query() { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, &points); + store.insert_row(&row).unwrap(); // Assign one of them a color with an explicit instance.. timelessly! let color_instances = vec![InstanceKey(1)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = - try_build_msg_bundle2(MsgId::random(), ent_path, [], (color_instances, colors)).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells2(MsgId::random(), ent_path, [], 1, (color_instances, colors)); + store.insert_row(&row).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -157,13 +154,13 @@ fn no_instance_join_query() { // Create some points with an implicit instance let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, &points); + store.insert_row(&row).unwrap(); // Assign them colors with explicit instances let colors = vec![ColorRGBA(0xff000000), ColorRGBA(0x00ff0000)]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &colors).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, &colors); + store.insert_row(&row).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -221,8 +218,8 @@ fn missing_column_join_query() { // Create some points with an implicit instance let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, points); + store.insert_row(&row).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -279,20 +276,20 @@ fn splatted_query() { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path, timepoint, 2, points); + store.insert_row(&row).unwrap(); // Assign all of them a color via splat let color_instances = vec![InstanceKey::SPLAT]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path, timepoint, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); diff --git a/crates/re_query/tests/range_tests.rs b/crates/re_query/tests/range_tests.rs index 8c007ae82a079..54f2cf62a3d7e 100644 --- a/crates/re_query/tests/range_tests.rs +++ b/crates/re_query/tests/range_tests.rs @@ -5,9 +5,7 @@ use re_log_types::{ component_types::InstanceKey, component_types::{ColorRGBA, Point2D}, datagen::build_frame_nr, - msg_bundle::try_build_msg_bundle1, - msg_bundle::try_build_msg_bundle2, - Component, EntityPath, MsgId, + Component, DataRow, EntityPath, MsgId, }; use re_query::range_entity_with_primary; @@ -21,21 +19,20 @@ fn simple_range() { { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint1, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint1, 2, &points); + store.insert_row(&row).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint1, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint2 = [build_frame_nr(223.into())]; @@ -43,23 +40,22 @@ fn simple_range() { // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(0)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint2, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint3 = [build_frame_nr(323.into())]; { // Create some points with implicit instances let points = vec![Point2D { x: 10.0, y: 20.0 }, Point2D { x: 30.0, y: 40.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint3, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint3, 2, &points); + store.insert_row(&row).unwrap(); } // --- First test: `(timepoint1, timepoint3]` --- @@ -241,35 +237,34 @@ fn timeless_range() { { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint1, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint1, 2, &points); + store.insert_row(&row).unwrap(); // Insert timelessly too! - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), [], &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), [], 2, &points); + store.insert_row(&row).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint1, + 1, (color_instances.clone(), colors.clone()), - ) - .unwrap(); - store.insert(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); // Insert timelessly too! - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), [], + 1, (color_instances, colors), - ) - .unwrap(); - store.insert(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint2 = [build_frame_nr(223.into())]; @@ -277,37 +272,36 @@ fn timeless_range() { // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(0)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint2, + 1, (color_instances.clone(), colors.clone()), - ) - .unwrap(); - store.insert(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); // Insert timelessly too! - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint2, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint3 = [build_frame_nr(323.into())]; { // Create some points with implicit instances let points = vec![Point2D { x: 10.0, y: 20.0 }, Point2D { x: 30.0, y: 40.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint3, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint3, 2, &points); + store.insert_row(&row).unwrap(); // Insert timelessly too! - let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), [], &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), [], 2, &points); + store.insert_row(&row).unwrap(); } // ┌───────────┬──────────┬────────┬─────────────────┬────────────────────┬──────────────────────┬────────────────────────────┐ @@ -676,21 +670,20 @@ fn simple_splatted_range() { { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint1, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint1, 2, points); + store.insert_row(&row).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; let colors = vec![ColorRGBA(0xff000000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint1, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint2 = [build_frame_nr(223.into())]; @@ -698,23 +691,22 @@ fn simple_splatted_range() { // Assign one of them a color with a splatted instance let color_instances = vec![InstanceKey::SPLAT]; let colors = vec![ColorRGBA(0x00ff0000)]; - let bundle = try_build_msg_bundle2( + let row = DataRow::from_cells2( MsgId::random(), ent_path.clone(), timepoint2, + 1, (color_instances, colors), - ) - .unwrap(); - store.insert(&bundle).unwrap(); + ); + store.insert_row(&row).unwrap(); } let timepoint3 = [build_frame_nr(323.into())]; { // Create some points with implicit instances let points = vec![Point2D { x: 10.0, y: 20.0 }, Point2D { x: 30.0, y: 40.0 }]; - let bundle = - try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint3, &points).unwrap(); - store.insert(&bundle).unwrap(); + let row = DataRow::from_cells1(MsgId::random(), ent_path.clone(), timepoint3, 2, points); + store.insert_row(&row).unwrap(); } // --- First test: `(timepoint1, timepoint3]` --- diff --git a/crates/re_sdk/src/msg_sender.rs b/crates/re_sdk/src/msg_sender.rs index 7be70fe3048b3..ed36f3b363c9c 100644 --- a/crates/re_sdk/src/msg_sender.rs +++ b/crates/re_sdk/src/msg_sender.rs @@ -1,4 +1,4 @@ -use re_log_types::{component_types::InstanceKey, msg_bundle::MsgBundleError}; +use re_log_types::{component_types::InstanceKey, msg_bundle::MsgBundleError, DataRow, DataTable}; use nohash_hasher::IntMap; @@ -30,7 +30,7 @@ pub enum MsgSenderError { "All component collections must share the same number of instances (i.e. row length) \ for a given row, got {0:?} instead" )] - MismatchedRowLengths(Vec<(ComponentName, usize)>), + MismatchedRowLengths(Vec<(ComponentName, u32)>), /// Instance keys cannot be splatted #[error("Instance keys cannot be splatted")] @@ -90,7 +90,7 @@ pub struct MsgSender { /// collection will always be 1. /// The number of instances per row, on the other hand, will be decided based upon the first /// component collection that's appended. - num_instances: Option, + num_instances: Option, /// All the instanced component collections that have been appended to this message. /// @@ -202,7 +202,7 @@ impl MsgSender { let collections = self .instanced .into_iter() - .map(|cell| (cell.name(), cell.len())) + .map(|cell| (cell.component(), cell.len())) .collect(); return Err(MsgSenderError::MismatchedRowLengths(collections)); } @@ -289,7 +289,7 @@ impl MsgSender { entity_path, timepoint, timeless, - num_instances: _, + num_instances, instanced, mut splatted, } = self; @@ -306,14 +306,14 @@ impl MsgSender { let mut all_cells: Vec<_> = instanced.into_iter().map(Some).collect(); let standard_cells: Vec<_> = all_cells .iter_mut() - .filter(|cell| cell.as_ref().unwrap().name() != Transform::name()) + .filter(|cell| cell.as_ref().unwrap().component() != Transform::name()) .map(|cell| cell.take().unwrap()) .collect(); let transform_cells: Vec<_> = all_cells .iter_mut() .filter(|cell| { cell.as_ref() - .map_or(false, |cell| cell.name() == Transform::name()) + .map_or(false, |cell| cell.component() == Transform::name()) }) .map(|cell| cell.take().unwrap()) .collect(); @@ -330,7 +330,7 @@ impl MsgSender { .chain(&transform_cells) .chain(&splatted) { - *rows_per_comptype.entry(cell.name()).or_default() += 1; + *rows_per_comptype.entry(cell.component()).or_default() += 1; } if rows_per_comptype.values().any(|num_rows| *num_rows > 1) { return Err(MsgSenderError::MoreThanOneRow( @@ -346,30 +346,53 @@ impl MsgSender { let mut msgs = [(); 3].map(|_| None); + // TODO: still shouldnt have any into_msg_bundle here + // Standard msgs[0] = (!standard_cells.is_empty()).then(|| { - MsgBundle::new( - MsgId::random(), - entity_path.clone(), - timepoint.clone(), - standard_cells, + DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells( + MsgId::random(), + timepoint.clone(), + entity_path.clone(), + num_instances.unwrap_or(0), + standard_cells, + )], ) + .into_msg_bundle() }); // Transforms msgs[1] = (!transform_cells.is_empty()).then(|| { - MsgBundle::new( - MsgId::random(), - entity_path.clone(), - timepoint.clone(), - transform_cells, + DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells( + MsgId::random(), + timepoint.clone(), + entity_path.clone(), + num_transform_instances, + transform_cells, + )], ) + .into_msg_bundle() }); // Splats + // TODO(cmc): unsplit splats once new data cells are in msgs[2] = (!splatted.is_empty()).then(|| { splatted.push(DataCell::from_native(&[InstanceKey::SPLAT])); - MsgBundle::new(MsgId::random(), entity_path, timepoint, splatted) + DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells( + MsgId::random(), + timepoint, + entity_path, + 1, + splatted, + )], + ) + .into_msg_bundle() }); Ok(msgs) diff --git a/crates/re_viewer/src/ui/data_ui/mod.rs b/crates/re_viewer/src/ui/data_ui/mod.rs index 1e8f5d3425f56..ea0b0ea43b0a1 100644 --- a/crates/re_viewer/src/ui/data_ui/mod.rs +++ b/crates/re_viewer/src/ui/data_ui/mod.rs @@ -78,7 +78,7 @@ impl DataUi for [DataCell] { _query: &re_arrow_store::LatestAtQuery, ) { let mut sorted = self.to_vec(); - sorted.sort_by_key(|cb| cb.name()); + sorted.sort_by_key(|cb| cb.component()); match verbosity { UiVerbosity::Small | UiVerbosity::MaxHeight(_) => { @@ -98,7 +98,7 @@ impl DataUi for [DataCell] { fn format_cell(cell: &DataCell) -> String { // TODO(emilk): if there's only once instance, and the byte size is small, then deserialize and show the value. - format!("{}x {}", cell.len(), cell.name().short_name()) + format!("{}x {}", cell.len(), cell.component().short_name()) } impl DataUi for PathOp { diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index 433ab9b91c3e2..2c50f9992f7f0 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -1,6 +1,7 @@ //! Methods for handling Arrow datamodel log ingest use arrow2::{array::Array, datatypes::Field, ffi}; +use itertools::Itertools as _; use pyo3::{ exceptions::{PyAttributeError, PyValueError}, ffi::Py_uintptr_t, @@ -9,9 +10,8 @@ use pyo3::{ PyAny, PyResult, }; use re_log_types::{ - component_types, - msg_bundle::{MsgBundle, MsgBundleError}, - DataCell, EntityPath, LogMsg, MsgId, TimePoint, + component_types, msg_bundle::MsgBundleError, DataCell, DataRow, EntityPath, LogMsg, MsgId, + TimePoint, }; /// Perform conversion between a pyarrow array to arrow2 types. @@ -102,15 +102,21 @@ pub fn build_chunk_from_components( .into_iter() .zip(fields.into_iter()) .map(|(value, field)| DataCell::from_arrow(field.name.into(), value)) - .collect(); + .collect_vec(); - let msg_bundle = MsgBundle::new( + let num_instances = cells.first().map_or(0, |cell| cell.len()); + let row = DataRow::from_cells( MsgId::random(), - entity_path.clone(), time_point.clone(), + entity_path.clone(), + num_instances, cells, ); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle .try_into() .map_err(|e: MsgBundleError| PyValueError::new_err(e.to_string()))?; diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 43c70d572bb0e..7f56b5107fbe8 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -10,6 +10,7 @@ use pyo3::{ types::PyDict, }; +use re_log_types::DataRow; use rerun::{ log::{LogMsg, MsgBundle, MsgId, PathOp}, time::{Time, TimeInt, TimePoint, TimeType, Timeline}, @@ -462,14 +463,18 @@ fn log_transform( // python side will take a bit of additional work and testing to ensure we aren't // introducing new numerical issues. - let bundle = MsgBundle::new( + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![vec![transform].try_into().unwrap()], + 1, + [transform].as_slice(), ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg)); @@ -542,14 +547,19 @@ fn log_view_coordinates( // non-trivial. Implementing this functionality on the python side will take // a bit of additional work and testing to ensure we aren't introducing new // conversion errors. - let bundle = MsgBundle::new( + + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![vec![coordinates].try_into().unwrap()], + 1, + [coordinates].as_slice(), ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg)); @@ -644,14 +654,18 @@ fn log_meshes( // // TODO(jleibs) replace with python-native implementation - let bundle = MsgBundle::new( + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![meshes.try_into().unwrap()], + meshes.len() as _, + meshes, ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg)); @@ -722,14 +736,18 @@ fn log_mesh_file( // // TODO(jleibs) replace with python-native implementation - let bundle = MsgBundle::new( + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![vec![mesh3d].try_into().unwrap()], + 1, + [mesh3d].as_slice(), ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg)); @@ -799,26 +817,30 @@ fn log_image_file( let time_point = time(timeless); - let bundle = MsgBundle::new( + let tensor = re_log_types::component_types::Tensor { + tensor_id: TensorId::random(), + shape: vec![ + TensorDimension::height(h as _), + TensorDimension::width(w as _), + TensorDimension::depth(3), + ], + data: re_log_types::component_types::TensorData::JPEG(img_bytes.into()), + meaning: re_log_types::component_types::TensorDataMeaning::Unknown, + meter: None, + }; + + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![vec![re_log_types::component_types::Tensor { - tensor_id: TensorId::random(), - shape: vec![ - TensorDimension::height(h as _), - TensorDimension::width(w as _), - TensorDimension::depth(3), - ], - data: re_log_types::component_types::TensorData::JPEG(img_bytes.into()), - meaning: re_log_types::component_types::TensorDataMeaning::Unknown, - meter: None, - }] - .try_into() - .unwrap()], + 1, + [tensor].as_slice(), ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg)); @@ -886,14 +908,19 @@ fn log_annotation_context( // implementation. // // TODO(jleibs) replace with python-native implementation - let bundle = MsgBundle::new( + + let row = DataRow::from_cells1( MsgId::random(), entity_path, time_point, - vec![vec![annotation_context.clone()].try_into().unwrap()], + 1, + [annotation_context].as_slice(), ); - let msg = bundle.try_into().unwrap(); + let msg_bundle = row + .into_table(MsgId::ZERO /* not used (yet) */) + .into_msg_bundle(); + let msg = msg_bundle.try_into().unwrap(); session.send(LogMsg::ArrowMsg(msg));