From 17a21bffb8c36e737df12bcba823d4bd25c0d077 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Mon, 27 Mar 2023 10:08:47 +0200 Subject: [PATCH] end-to-end batching 1: introduce `DataCell` & retire `ComponentBundle` (#1634) * introduce DataCell, retire ComponentBundle * oh. my. god. * other misc improvements while we're here... * addressing PR comments * link to 1712 * auto rename failed me * addressing PR comments --- Cargo.lock | 1 + Cargo.toml | 3 +- crates/re_arrow_store/Cargo.toml | 2 +- crates/re_arrow_store/benches/data_store.rs | 2 +- .../re_arrow_store/examples/dump_dataframe.rs | 18 +- .../examples/latest_component.rs | 4 +- .../examples/latest_components.rs | 4 +- .../examples/range_components.rs | 14 +- crates/re_arrow_store/src/arrow_util.rs | 195 +++----- crates/re_arrow_store/src/lib.rs | 2 +- crates/re_arrow_store/src/store.rs | 4 +- crates/re_arrow_store/src/store_write.rs | 342 +++++-------- crates/re_arrow_store/tests/correctness.rs | 92 +--- crates/re_arrow_store/tests/data_store.rs | 52 +- crates/re_arrow_store/tests/internals.rs | 8 +- crates/re_data_store/Cargo.toml | 2 +- crates/re_data_store/src/log_db.rs | 30 +- crates/re_log_types/Cargo.toml | 1 + crates/re_log_types/src/arrow_msg.rs | 4 + .../src/component_types/instance_key.rs | 7 + crates/re_log_types/src/data_cell.rs | 448 ++++++++++++++++++ crates/re_log_types/src/datagen.rs | 2 + crates/re_log_types/src/lib.rs | 4 +- crates/re_log_types/src/msg_bundle.rs | 353 ++++---------- crates/re_memory/Cargo.toml | 2 +- crates/re_query/Cargo.toml | 4 +- crates/re_query/benches/query_benchmark.rs | 2 +- crates/re_query/examples/range.rs | 12 +- crates/re_query/src/entity_view.rs | 4 - crates/re_query/src/query.rs | 4 +- crates/re_query/tests/query_tests.rs | 18 +- crates/re_query/tests/range_tests.rs | 32 +- crates/re_renderer/Cargo.toml | 2 +- crates/re_sdk/src/lib.rs | 5 +- crates/re_sdk/src/msg_sender.rs | 103 ++-- crates/re_viewer/Cargo.toml | 2 +- crates/re_viewer/src/ui/data_ui/log_msg.rs | 2 +- crates/re_viewer/src/ui/data_ui/mod.rs | 16 +- crates/re_viewer/src/ui/event_log_view.rs | 2 +- crates/rerun/Cargo.toml | 2 +- examples/rust/api_demo/Cargo.toml | 2 +- examples/rust/dna/Cargo.toml | 2 +- examples/rust/objectron/Cargo.toml | 2 +- rerun_py/Cargo.toml | 2 +- rerun_py/src/arrow.rs | 12 +- 45 files changed, 928 insertions(+), 898 deletions(-) create mode 100644 crates/re_log_types/src/data_cell.rs diff --git a/Cargo.lock b/Cargo.lock index 0465073952f6..ea9414e73bec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3861,6 +3861,7 @@ dependencies = [ "glam", "half 2.2.1", "image", + "itertools", "lazy_static", "macaw", "mimalloc", diff --git a/Cargo.toml b/Cargo.toml index 8bdd54a78980..b70d80928b72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,9 +59,9 @@ ctrlc = { version = "3.0", features = ["termination"] } ecolor = "0.21.0" eframe = { version = "0.21.3", default-features = false } egui = "0.21.0" +egui-wgpu = "0.21.0" egui_dock = "0.4" egui_extras = "0.21.0" -egui-wgpu = "0.21.0" emath = "0.21.0" enumset = "1.0.12" epaint = "0.21.0" @@ -69,6 +69,7 @@ glam = "0.22" gltf = "1.1" half = "2.0" image = { version = "0.24", default-features = false } +itertools = "0.10" lazy_static = "1.4" macaw = "0.18" mimalloc = "0.1.29" diff --git a/crates/re_arrow_store/Cargo.toml b/crates/re_arrow_store/Cargo.toml index 71ab65fe1b60..8487f656b17f 100644 --- a/crates/re_arrow_store/Cargo.toml +++ b/crates/re_arrow_store/Cargo.toml @@ -41,7 +41,7 @@ arrow2 = { workspace = true, features = [ ] } document-features = "0.2" indent = "0.1" -itertools = "0.10" +itertools = { workspace = true } nohash-hasher = "0.2" parking_lot.workspace = true static_assertions = "1.1" diff --git a/crates/re_arrow_store/benches/data_store.rs b/crates/re_arrow_store/benches/data_store.rs index 7ad044e94cd7..1392ee640697 100644 --- a/crates/re_arrow_store/benches/data_store.rs +++ b/crates/re_arrow_store/benches/data_store.rs @@ -166,7 +166,7 @@ fn insert_messages<'a>( msgs: impl Iterator, ) -> DataStore { let mut store = DataStore::new(cluster_key, config); - msgs.for_each(|msg_bundle| store.insert(msg_bundle).unwrap()); + msgs.for_each(|msg_bundle| store.insert_row(msg_bundle).unwrap()); store } diff --git a/crates/re_arrow_store/examples/dump_dataframe.rs b/crates/re_arrow_store/examples/dump_dataframe.rs index dba930600722..e6b16d46ecdf 100644 --- a/crates/re_arrow_store/examples/dump_dataframe.rs +++ b/crates/re_arrow_store/examples/dump_dataframe.rs @@ -29,48 +29,48 @@ fn main() { let bundle1 = test_bundle!(ent_path @ [ build_frame_nr(1.into()), build_log_time(Time::now()), ] => [build_some_instances(2), build_some_rects(2)]); - store.insert(&bundle1).unwrap(); + store.insert_row(&bundle1).unwrap(); } for ent_path in &ent_paths { let bundle2 = test_bundle!(ent_path @ [ build_frame_nr(2.into()) ] => [build_some_instances(2), build_some_point2d(2)]); - store.insert(&bundle2).unwrap(); + store.insert_row(&bundle2).unwrap(); // Insert timelessly too! let bundle2 = test_bundle!(ent_path @ [] => [build_some_instances(2), build_some_point2d(2)]); - store.insert(&bundle2).unwrap(); + store.insert_row(&bundle2).unwrap(); let bundle3 = test_bundle!(ent_path @ [ build_frame_nr(3.into()), build_log_time(Time::now()), ] => [build_some_instances_from(25..29), build_some_point2d(4)]); - store.insert(&bundle3).unwrap(); + store.insert_row(&bundle3).unwrap(); // Insert timelessly too! let bundle3 = test_bundle!(ent_path @ [] => [build_some_instances_from(25..29), build_some_point2d(4)]); - store.insert(&bundle3).unwrap(); + store.insert_row(&bundle3).unwrap(); } for ent_path in &ent_paths { let bundle4_1 = test_bundle!(ent_path @ [ build_frame_nr(4.into()), build_log_time(Time::now()), ] => [build_some_instances_from(20..23), build_some_rects(3)]); - store.insert(&bundle4_1).unwrap(); + store.insert_row(&bundle4_1).unwrap(); let bundle4_15 = test_bundle!(ent_path @ [ build_frame_nr(4.into()), ] => [build_some_instances_from(20..23), build_some_point2d(3)]); - store.insert(&bundle4_15).unwrap(); + store.insert_row(&bundle4_15).unwrap(); let bundle4_2 = test_bundle!(ent_path @ [ build_frame_nr(4.into()), build_log_time(Time::now()), ] => [build_some_instances_from(25..28), build_some_rects(3)]); - store.insert(&bundle4_2).unwrap(); + store.insert_row(&bundle4_2).unwrap(); let bundle4_25 = test_bundle!(ent_path @ [ build_frame_nr(4.into()), build_log_time(Time::now()), ] => [build_some_instances_from(25..28), build_some_point2d(3)]); - store.insert(&bundle4_25).unwrap(); + store.insert_row(&bundle4_25).unwrap(); } let df = store.to_dataframe(); diff --git a/crates/re_arrow_store/examples/latest_component.rs b/crates/re_arrow_store/examples/latest_component.rs index 61f75d9ee7f4..8a4f9563cc0b 100644 --- a/crates/re_arrow_store/examples/latest_component.rs +++ b/crates/re_arrow_store/examples/latest_component.rs @@ -21,10 +21,10 @@ fn main() { let ent_path = EntityPath::from("my/entity"); let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); diff --git a/crates/re_arrow_store/examples/latest_components.rs b/crates/re_arrow_store/examples/latest_components.rs index f9f8a67b179d..dd00c46886be 100644 --- a/crates/re_arrow_store/examples/latest_components.rs +++ b/crates/re_arrow_store/examples/latest_components.rs @@ -20,10 +20,10 @@ fn main() { let ent_path = EntityPath::from("my/entity"); let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); let df = latest_components( diff --git a/crates/re_arrow_store/examples/range_components.rs b/crates/re_arrow_store/examples/range_components.rs index 56b8f152fb7c..c25fe8db29cf 100644 --- a/crates/re_arrow_store/examples/range_components.rs +++ b/crates/re_arrow_store/examples/range_components.rs @@ -24,25 +24,25 @@ fn main() { let frame4 = 4.into(); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(4)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(1)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(3)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(2.into(), 4.into())); diff --git a/crates/re_arrow_store/src/arrow_util.rs b/crates/re_arrow_store/src/arrow_util.rs index 4fcc1e7791db..d54d08fd26b4 100644 --- a/crates/re_arrow_store/src/arrow_util.rs +++ b/crates/re_arrow_store/src/arrow_util.rs @@ -1,28 +1,16 @@ -use anyhow::bail; use arrow2::{ array::{ - growable::make_growable, Array, FixedSizeListArray, ListArray, PrimitiveArray, StructArray, - UnionArray, + growable::make_growable, Array, FixedSizeListArray, ListArray, StructArray, UnionArray, }, bitmap::Bitmap, datatypes::{DataType, Field, UnionMode}, offset::Offsets, - types::NativeType, }; use itertools::Itertools; // --- pub trait ArrayExt: Array { - /// Returns `true` if the array is dense (no nulls). - fn is_dense(&self) -> bool; - - /// Returns `true` if the array is both sorted (increasing order) and contains only unique - /// values. - /// - /// The array must be dense, otherwise the result of this method is undefined. - fn is_sorted_and_unique(&self) -> anyhow::Result; - /// Returns the length of the child array at the given index. /// /// * Panics if `self` is not a `ListArray`. @@ -40,38 +28,6 @@ pub trait ArrayExt: Array { } impl ArrayExt for dyn Array { - fn is_dense(&self) -> bool { - if let Some(validity) = self.validity() { - validity.unset_bits() == 0 - } else { - true - } - } - - fn is_sorted_and_unique(&self) -> anyhow::Result { - debug_assert!(self.is_dense()); - - fn is_sorted_and_unique_primitive(arr: &dyn Array) -> bool { - let values = arr.as_any().downcast_ref::>().unwrap(); - values.values().windows(2).all(|v| v[0] < v[1]) - } - - // TODO(cmc): support more datatypes as the need arise. - match self.data_type() { - DataType::Int8 => Ok(is_sorted_and_unique_primitive::(self)), - DataType::Int16 => Ok(is_sorted_and_unique_primitive::(self)), - DataType::Int32 => Ok(is_sorted_and_unique_primitive::(self)), - DataType::Int64 => Ok(is_sorted_and_unique_primitive::(self)), - DataType::UInt8 => Ok(is_sorted_and_unique_primitive::(self)), - DataType::UInt16 => Ok(is_sorted_and_unique_primitive::(self)), - DataType::UInt32 => Ok(is_sorted_and_unique_primitive::(self)), - DataType::UInt64 => Ok(is_sorted_and_unique_primitive::(self)), - DataType::Float32 => Ok(is_sorted_and_unique_primitive::(self)), - DataType::Float64 => Ok(is_sorted_and_unique_primitive::(self)), - _ => bail!("unsupported datatype: {:?}", self.data_type()), - } - } - /// Return the length of the first child. /// /// ## Panics @@ -262,103 +218,46 @@ impl ArrayExt for dyn Array { #[test] fn test_clean_for_polars_nomodify() { use re_log_types::datagen::build_some_colors; - use re_log_types::msg_bundle::ComponentBundle; + use re_log_types::DataCell; // Colors don't need polars cleaning - let bundle: ComponentBundle = build_some_colors(5).try_into().unwrap(); - let cleaned = bundle.value_boxed().clean_for_polars(); - assert_eq!(bundle.value_boxed(), cleaned); + let cell: DataCell = build_some_colors(5).try_into().unwrap(); + let cleaned = cell.as_arrow_ref().clean_for_polars(); + assert_eq!(cell.as_arrow_ref(), &*cleaned); } #[test] fn test_clean_for_polars_modify() { - use re_log_types::msg_bundle::ComponentBundle; - use re_log_types::{Pinhole, Transform}; + use re_log_types::{DataCell, Pinhole, Transform}; // transforms are a nice pathological type with both Unions and FixedSizeLists let transforms = vec![Transform::Pinhole(Pinhole { image_from_cam: [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]].into(), resolution: None, })]; - let bundle: ComponentBundle = transforms.try_into().unwrap(); - assert_eq!( - *bundle.value_boxed().data_type(), - DataType::List(Box::new(Field::new( - "item", - DataType::Union( - vec![ - Field::new("Unknown", DataType::Boolean, false), - Field::new( - "Rigid3", - DataType::Struct(vec![ - Field::new( - "rotation", - DataType::FixedSizeList( - Box::new(Field::new("item", DataType::Float32, false)), - 4 - ), - false - ), - Field::new( - "translation", - DataType::FixedSizeList( - Box::new(Field::new("item", DataType::Float32, false)), - 3 - ), - false - ) - ]), - false - ), - Field::new( - "Pinhole", - DataType::Struct(vec![ - Field::new( - "image_from_cam", - DataType::FixedSizeList( - Box::new(Field::new("item", DataType::Float32, false)), - 9 - ), - false, - ), - Field::new( - "resolution", - DataType::FixedSizeList( - Box::new(Field::new("item", DataType::Float32, false)), - 2 - ), - true, - ), - ]), - false - ) - ], - None, - UnionMode::Dense - ), - true - ))) - ); - - let cleaned = bundle.value_boxed().clean_for_polars(); - + let cell: DataCell = transforms.try_into().unwrap(); assert_eq!( - *cleaned.data_type(), - DataType::List(Box::new(Field::new( - "item", - DataType::Struct(vec![ + *cell.datatype(), + DataType::Union( + vec![ Field::new("Unknown", DataType::Boolean, false), Field::new( "Rigid3", DataType::Struct(vec![ Field::new( "rotation", - DataType::List(Box::new(Field::new("item", DataType::Float32, false)),), + DataType::FixedSizeList( + Box::new(Field::new("item", DataType::Float32, false)), + 4 + ), false ), Field::new( "translation", - DataType::List(Box::new(Field::new("item", DataType::Float32, false)),), + DataType::FixedSizeList( + Box::new(Field::new("item", DataType::Float32, false)), + 3 + ), false ) ]), @@ -369,19 +268,67 @@ fn test_clean_for_polars_modify() { DataType::Struct(vec![ Field::new( "image_from_cam", - DataType::List(Box::new(Field::new("item", DataType::Float32, false))), + DataType::FixedSizeList( + Box::new(Field::new("item", DataType::Float32, false)), + 9 + ), false, ), Field::new( "resolution", - DataType::List(Box::new(Field::new("item", DataType::Float32, false))), + DataType::FixedSizeList( + Box::new(Field::new("item", DataType::Float32, false)), + 2 + ), true, ), ]), false ) - ],), - true - ))) + ], + None, + UnionMode::Dense + ), + ); + + let cleaned = cell.as_arrow_ref().clean_for_polars(); + + assert_eq!( + *cleaned.data_type(), + DataType::Struct(vec![ + Field::new("Unknown", DataType::Boolean, false), + Field::new( + "Rigid3", + DataType::Struct(vec![ + Field::new( + "rotation", + DataType::List(Box::new(Field::new("item", DataType::Float32, false)),), + false + ), + Field::new( + "translation", + DataType::List(Box::new(Field::new("item", DataType::Float32, false)),), + false + ) + ]), + false + ), + Field::new( + "Pinhole", + DataType::Struct(vec![ + Field::new( + "image_from_cam", + DataType::List(Box::new(Field::new("item", DataType::Float32, false))), + false, + ), + Field::new( + "resolution", + DataType::List(Box::new(Field::new("item", DataType::Float32, false))), + true, + ), + ]), + false + ) + ],), ); } diff --git a/crates/re_arrow_store/src/lib.rs b/crates/re_arrow_store/src/lib.rs index ec95c47f0505..3f2e5454caaa 100644 --- a/crates/re_arrow_store/src/lib.rs +++ b/crates/re_arrow_store/src/lib.rs @@ -8,7 +8,7 @@ //! * See [`DataStore`] for an overview of the core data structures. //! * See [`DataStore::latest_at`] and [`DataStore::range`] for the documentation of the public //! read APIs. -//! * See [`DataStore::insert`] for the documentation of the public write APIs. +//! * See [`DataStore::insert_row`] for the documentation of the public write APIs. //! //! ## Feature flags #![doc = document_features::document_features!()] diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index 7655aefd2e05..4d92abdd80a7 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -212,7 +212,7 @@ pub struct DataStore { /// This makes the cluster key a perfect candidate for joining query results together, and /// doing so as efficiently as possible. /// - /// See [`Self::insert`] for more information. + /// See [`Self::insert_row`] for more information. pub(crate) cluster_key: ComponentName, /// The configuration of the data store (e.g. bucket sizes). @@ -225,7 +225,7 @@ pub struct DataStore { /// Used to cache auto-generated cluster components, i.e. `[0]`, `[0, 1]`, `[0, 1, 2]`, etc /// so that they can be properly deduplicated. - pub(crate) cluster_comp_cache: IntMap, + pub(crate) cluster_comp_cache: IntMap, /// Dedicated index tables for timeless data. Never garbage collected. /// diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index 0521c8eb355e..61ff4909693c 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -1,5 +1,5 @@ use arrow2::{ - array::{new_empty_array, Array, ListArray, UInt64Array}, + array::{new_empty_array, Array}, datatypes::DataType, }; use itertools::Itertools as _; @@ -8,36 +8,38 @@ use parking_lot::RwLock; use re_log::{debug, trace}; use re_log_types::{ - msg_bundle::{wrap_in_listarray, ComponentBundle, MsgBundle}, - ComponentName, EntityPath, MsgId, TimeInt, TimePoint, TimeRange, Timeline, + component_types::InstanceKey, + msg_bundle::{wrap_in_listarray, MsgBundle}, + ComponentName, DataCell, DataCellError, EntityPath, MsgId, TimeInt, TimePoint, TimeRange, + Timeline, }; use crate::{ - ArrayExt as _, ComponentBucket, ComponentTable, DataStore, DataStoreConfig, IndexBucket, - IndexBucketIndices, IndexTable, PersistentComponentTable, PersistentIndexTable, RowIndex, - RowIndexKind, TimeIndex, + ComponentBucket, ComponentTable, DataStore, DataStoreConfig, IndexBucket, IndexBucketIndices, + IndexTable, PersistentComponentTable, PersistentIndexTable, RowIndex, RowIndexKind, TimeIndex, }; +// TODO(#1619): +// - The store should insert column-per-column rather than row-per-row (purely a performance +// matter) +// - The store shouldn't ever deal with raw arrow arrays, use cells/rows/tables instead + // --- Data store --- #[derive(thiserror::Error, Debug)] pub enum WriteError { - // Batches - #[error("Cannot insert more than 1 row at a time, got {0}")] - MoreThanOneRow(usize), - - #[error("All components must have the same number of rows, got {0:?}")] - MismatchedRows(Vec<(ComponentName, usize)>), + #[error("Error with the underlying data cell")] + DataCell(#[from] DataCellError), // Clustering key #[error("The cluster component must be dense, got {0:?}")] - SparseClusteringComponent(Box), + SparseClusteringComponent(DataCell), #[error( "The cluster component must be increasingly sorted and not contain \ any duplicates, got {0:?}" )] - InvalidClusteringComponent(Box), + InvalidClusteringComponent(DataCell), // Instances #[error( @@ -47,9 +49,9 @@ pub enum WriteError { )] MismatchedInstances { cluster_comp: ComponentName, - cluster_comp_nb_instances: usize, + cluster_comp_nb_instances: u32, key: ComponentName, - num_instances: usize, + num_instances: u32, }, // Misc @@ -62,13 +64,12 @@ pub type WriteResult = ::std::result::Result; impl DataStore { /// Inserts a [`MsgBundle`]'s worth of components into the datastore. /// - /// * All components across the bundle must share the same number of rows. /// * All components within a single row must share the same number of instances. /// /// If the bundle doesn't carry a payload for the cluster key, one will be auto-generated /// based on the length of the components in the payload, in the form of an array of /// monotonically increasing u64s going from `0` to `N-1`. - pub fn insert(&mut self, msg: &MsgBundle) -> WriteResult<()> { + pub fn insert_row(&mut self, msg: &MsgBundle) -> WriteResult<()> { // TODO(cmc): kind & insert_id need to somehow propagate through the span system. self.insert_id += 1; @@ -76,42 +77,30 @@ impl DataStore { msg_id, entity_path: ent_path, time_point, - components: bundles, + cells, } = msg; - if bundles.is_empty() { + if cells.is_empty() { return Ok(()); } crate::profile_function!(); let ent_path_hash = ent_path.hash(); - let num_rows = bundles[0].num_rows(); + // TODO(#1619): remove this thing when removing MsgBundle. + // // Effectively the same thing as having a non-unit length batch, except it's really not // worth more than an assertion since: // - A) `MsgBundle` should already guarantee this // - B) this limitation should be gone soon enough debug_assert!( - msg.components + msg.cells .iter() - .map(|bundle| bundle.name()) + .map(|cell| cell.component_name()) .all_unique(), "cannot insert same component multiple times, this is equivalent to multiple rows", ); - // Batches cannot contain more than 1 row at the moment. - if num_rows != 1 { - return Err(WriteError::MoreThanOneRow(num_rows)); - } - // Components must share the same number of rows. - if !bundles.iter().all(|bundle| bundle.num_rows() == num_rows) { - return Err(WriteError::MismatchedRows( - bundles - .iter() - .map(|bundle| (bundle.name(), bundle.num_rows())) - .collect(), - )); - } trace!( kind = "insert", @@ -121,23 +110,19 @@ impl DataStore { .map(|(timeline, time)| (timeline.name(), timeline.typ().format(*time))) .collect::>(), entity = %ent_path, - components = ?bundles.iter().map(|bundle| bundle.name()).collect::>(), - num_rows, + components = ?cells.iter().map(|cell| cell.component_name()).collect_vec(), "insertion started..." ); - let cluster_comp_pos = bundles + let cluster_comp_pos = cells .iter() - .find_position(|bundle| bundle.name() == self.cluster_key) + .find_position(|cell| cell.component_name() == self.cluster_key) .map(|(pos, _)| pos); if time_point.is_timeless() { let mut row_indices = IntMap::default(); - // TODO(#589): support for batched row component insertions - for row_nr in 0..num_rows { - self.insert_timeless_row(row_nr, cluster_comp_pos, bundles, &mut row_indices)?; - } + self.insert_timeless_row_helper(cluster_comp_pos, cells, &mut row_indices)?; let index = self .timeless_indices @@ -147,16 +132,7 @@ impl DataStore { } else { let mut row_indices = IntMap::default(); - // TODO(#589): support for batched row component insertions - for row_nr in 0..num_rows { - self.insert_row( - time_point, - row_nr, - cluster_comp_pos, - bundles, - &mut row_indices, - )?; - } + self.insert_row_helper(time_point, cluster_comp_pos, cells, &mut row_indices)?; for (timeline, time) in time_point.iter() { let ent_path = ent_path.clone(); // shallow @@ -168,27 +144,22 @@ impl DataStore { } } - // This is valuable information, even for a timeless timepoint! + // This is valuable information, even for a timeless ! self.messages.insert(*msg_id, time_point.clone()); Ok(()) } - fn insert_timeless_row( + fn insert_timeless_row_helper( &mut self, - row_nr: usize, cluster_comp_pos: Option, - components: &[ComponentBundle], + cells: &[DataCell], row_indices: &mut IntMap, ) -> WriteResult<()> { crate::profile_function!(); - let (cluster_row_idx, cluster_len) = self.get_or_create_cluster_component( - row_nr, - cluster_comp_pos, - components, - &TimePoint::default(), - )?; + let (cluster_row_idx, cluster_len) = + self.get_or_create_cluster_component(cluster_comp_pos, cells, &TimePoint::default())?; // Always insert the cluster component. row_indices.insert(self.cluster_key, cluster_row_idx); @@ -206,61 +177,45 @@ impl DataStore { ); } - for bundle in components + for cell in cells .iter() - .filter(|bundle| bundle.name() != self.cluster_key) + .filter(|cell| cell.component_name() != self.cluster_key) { - let (name, rows) = (bundle.name(), bundle.value_list()); - - // Unwrapping a ListArray is somewhat costly, especially considering we're just - // gonna rewrap it again in a minute... so we'd rather just slice it to a list of - // one instead. - // - // let rows_single = rows.slice(row_nr, 1); - // - // Except it turns out that slicing is _extremely_ costly! - // So use the fact that `rows` is always of unit-length for now. - let rows_single = rows; + let component = cell.component_name(); + let num_instances = cell.num_instances(); - let num_instances = rows_single.offsets().lengths().next().unwrap(); if num_instances != cluster_len { return Err(WriteError::MismatchedInstances { cluster_comp: self.cluster_key, cluster_comp_nb_instances: cluster_len, - key: name, + key: component, num_instances, }); } let table = self .timeless_components - .entry(bundle.name()) - .or_insert_with(|| { - PersistentComponentTable::new( - name, - ListArray::::get_child_type(rows_single.data_type()), - ) - }); + .entry(cell.component_name()) + .or_insert_with(|| PersistentComponentTable::new(component, cell.datatype())); - let row_idx = table.push(rows_single); - row_indices.insert(name, row_idx); + let row_idx = table.push_cell(cell); + row_indices.insert(component, row_idx); } Ok(()) } - fn insert_row( + fn insert_row_helper( &mut self, time_point: &TimePoint, - row_nr: usize, cluster_comp_pos: Option, - components: &[ComponentBundle], + cells: &[DataCell], row_indices: &mut IntMap, ) -> WriteResult<()> { crate::profile_function!(); let (cluster_row_idx, cluster_len) = - self.get_or_create_cluster_component(row_nr, cluster_comp_pos, components, time_point)?; + self.get_or_create_cluster_component(cluster_comp_pos, cells, time_point)?; // Always insert the cluster component. row_indices.insert(self.cluster_key, cluster_row_idx); @@ -278,42 +233,29 @@ impl DataStore { ); } - for bundle in components + for cell in cells .iter() - .filter(|bundle| bundle.name() != self.cluster_key) + .filter(|cell| cell.component_name() != self.cluster_key) { - let (name, rows) = (bundle.name(), bundle.value_list()); + let component = cell.component_name(); + let num_instances = cell.num_instances(); - // Unwrapping a ListArray is somewhat costly, especially considering we're just - // gonna rewrap it again in a minute... so we'd rather just slice it to a list of - // one instead. - // - // let rows_single = rows.slice(row_nr, 1); - // - // Except it turns out that slicing is _extremely_ costly! - // So use the fact that `rows` is always of unit-length for now. - let rows_single = rows; - - // TODO(#440): support for splats - let num_instances = rows_single.offsets().lengths().next().unwrap(); if num_instances != cluster_len { return Err(WriteError::MismatchedInstances { cluster_comp: self.cluster_key, cluster_comp_nb_instances: cluster_len, - key: name, + key: component, num_instances, }); } - let table = self.components.entry(bundle.name()).or_insert_with(|| { - ComponentTable::new( - name, - ListArray::::get_child_type(rows_single.data_type()), - ) - }); + let table = self + .components + .entry(component) + .or_insert_with(|| ComponentTable::new(component, cell.datatype())); - let row_idx = table.push(&self.config, time_point, rows_single); - row_indices.insert(name, row_idx); + let row_idx = table.push_cell(&self.config, time_point, cell); + row_indices.insert(component, row_idx); } Ok(()) @@ -327,37 +269,37 @@ impl DataStore { /// deduplication. fn get_or_create_cluster_component( &mut self, - _row_nr: usize, cluster_comp_pos: Option, - components: &[ComponentBundle], + cells: &[DataCell], time_point: &TimePoint, - ) -> WriteResult<(RowIndex, usize)> { + ) -> WriteResult<(RowIndex, u32)> { crate::profile_function!(); enum ClusterData<'a> { Cached(RowIndex), - GenData(Box), - UserData(&'a ListArray), + GenData(DataCell), + UserData(&'a DataCell), } let (cluster_len, cluster_data) = if let Some(cluster_comp_pos) = cluster_comp_pos { // We found a component with a name matching the cluster key's, let's make sure it's // valid (dense, sorted, no duplicates) and use that if so. - let cluster_comp = &components[cluster_comp_pos]; - let data = cluster_comp.value_list().values(); // abusing the fact that num_rows==1 - let len = data.len(); + let cluster_cell = &cells[cluster_comp_pos]; // Clustering component must be dense. - if !data.is_dense() { - return Err(WriteError::SparseClusteringComponent(data.clone())); + if !cluster_cell.is_dense() { + return Err(WriteError::SparseClusteringComponent(cluster_cell.clone())); } // Clustering component must be sorted and not contain any duplicates. - if !data.is_sorted_and_unique()? { - return Err(WriteError::InvalidClusteringComponent(data.clone())); + if !cluster_cell.is_sorted_and_unique()? { + return Err(WriteError::InvalidClusteringComponent(cluster_cell.clone())); } - (len, ClusterData::UserData(cluster_comp.value_list())) + ( + cluster_cell.num_instances(), + ClusterData::UserData(cluster_cell), + ) } else { // The caller has not specified any cluster component, and so we'll have to generate // one... unless we've already generated one of this exact length in the past, @@ -365,24 +307,33 @@ impl DataStore { // Use the length of any other component in the batch, they are guaranteed to all // share the same length at this point anyway. - let len = components.first().map_or(0, |comp| { - comp.value_list().offsets().lengths().next().unwrap() - }); + let len = cells.first().map_or(0, |comp| comp.num_instances()); if let Some(row_idx) = self.cluster_comp_cache.get(&len) { // Cache hit! Re-use that row index. (len, ClusterData::Cached(*row_idx)) } else { - // Cache miss! Craft a new u64 array from the ground up. - let data = UInt64Array::from_vec((0..len as u64).collect_vec()).boxed(); - let data = wrap_in_listarray(data).to_boxed(); - (len, ClusterData::GenData(data)) + // Cache miss! Craft a new instance keys from the ground up. + + // TODO(#1712): That's exactly how one should create a cell of instance keys... but + // it turns out that running `TryIntoArrow` on a primitive type is orders of + // magnitude slower than manually creating the equivalent primitive array for some + // reason... + // let cell = DataCell::from_component::(0..len as u64); + + // ...so we create it manually instead. + use re_log_types::msg_bundle::Component as _; + let values = + arrow2::array::UInt64Array::from_vec((0..len as u64).collect_vec()).boxed(); + let cell = DataCell::from_arrow(InstanceKey::name(), values); + + (len, ClusterData::GenData(cell)) } }; match cluster_data { ClusterData::Cached(row_idx) => Ok((row_idx, cluster_len)), - ClusterData::GenData(data) => { + ClusterData::GenData(cell) => { // We had to generate a cluster component of the given length for the first time, // let's store it forever. @@ -390,18 +341,15 @@ impl DataStore { .timeless_components .entry(self.cluster_key) .or_insert_with(|| { - PersistentComponentTable::new( - self.cluster_key, - ListArray::::get_child_type(data.data_type()), - ) + PersistentComponentTable::new(self.cluster_key, cell.datatype()) }); - let row_idx = table.push(&*data); + let row_idx = table.push_cell(&cell); self.cluster_comp_cache.insert(cluster_len, row_idx); Ok((row_idx, cluster_len)) } - ClusterData::UserData(data) => { + ClusterData::UserData(cell) => { // If we didn't hit the cache, then we have to insert this cluster component in // the right tables, just like any other component. @@ -410,20 +358,15 @@ impl DataStore { .timeless_components .entry(self.cluster_key) .or_insert_with(|| { - PersistentComponentTable::new( - self.cluster_key, - ListArray::::get_child_type(data.data_type()), - ) + PersistentComponentTable::new(self.cluster_key, cell.datatype()) }); - table.push(data) + table.push_cell(cell) } else { - let table = self.components.entry(self.cluster_key).or_insert_with(|| { - ComponentTable::new( - self.cluster_key, - ListArray::::get_child_type(data.data_type()), - ) - }); - table.push(&self.config, time_point, data) + let table = self + .components + .entry(self.cluster_key) + .or_insert_with(|| ComponentTable::new(self.cluster_key, cell.datatype())); + table.push_cell(&self.config, time_point, cell) }; Ok((row_idx, cluster_len)) @@ -994,42 +937,28 @@ impl PersistentComponentTable { } } - /// Pushes `rows_single` to the end of the bucket, returning the _global_ `RowIndex` of the + /// Pushes `cell` to the end of the bucket, returning the _global_ `RowIndex` of the /// freshly added row. - /// - /// `rows_single` must be a unit-length list of arrays of structs, - /// i.e. `ListArray`: - /// - the list layer corresponds to the different rows (always unit-length for now), - /// - the array layer corresponds to the different instances within that single row, - /// - and finally the struct layer holds the components themselves. - /// E.g.: - /// ```text - /// [[{x: 8.687487, y: 1.9590926}, {x: 2.0559108, y: 0.1494348}, {x: 7.09219, y: 0.9616637}]] - /// ``` - // - // TODO(#589): support for batched row component insertions - pub fn push(&mut self, rows_single: &dyn Array) -> RowIndex { + pub fn push_cell(&mut self, cell: &DataCell) -> RowIndex { crate::profile_function!(); debug_assert!( - ListArray::::get_child_type(rows_single.data_type()) == &self.datatype, + cell.datatype() == &self.datatype, "trying to insert data of the wrong datatype in a component table, \ expected {:?}, got {:?}", &self.datatype, - ListArray::::get_child_type(rows_single.data_type()), - ); - debug_assert!( - rows_single.len() == 1, - "batched row component insertions are not supported yet" + cell.datatype(), ); + // TODO(cmc): don't use raw arrays + let values = cell.as_arrow_monolist(); + self.total_rows += 1; // Warning: this is surprisingly costly! - self.total_size_bytes += - arrow2::compute::aggregate::estimated_bytes_size(rows_single) as u64; + self.total_size_bytes += arrow2::compute::aggregate::estimated_bytes_size(&*values) as u64; // TODO(#589): support for non-unit-length chunks - self.chunks.push(rows_single.to_boxed()); // shallow + self.chunks.push(values); RowIndex::from_u63(RowIndexKind::Timeless, self.chunks.len() as u64 - 1) } @@ -1050,38 +979,22 @@ impl ComponentTable { } } - /// Finds the appropriate bucket in this component table and pushes `rows_single` at the + /// Finds the appropriate bucket in this component table and pushes `cell` at the /// end of it, returning the _global_ `RowIndex` for this new row. - /// - /// `rows_single` must be a unit-length list of arrays of structs, - /// i.e. `ListArray`: - /// - the list layer corresponds to the different rows (always unit-length for now), - /// - the array layer corresponds to the different instances within that single row, - /// - and finally the struct layer holds the components themselves. - /// E.g.: - /// ```text - /// [[{x: 8.687487, y: 1.9590926}, {x: 2.0559108, y: 0.1494348}, {x: 7.09219, y: 0.9616637}]] - /// ``` - // - // TODO(#589): support for batched row component insertions - pub fn push( + pub fn push_cell( &mut self, config: &DataStoreConfig, time_point: &TimePoint, - rows_single: &dyn Array, + cell: &DataCell, ) -> RowIndex { crate::profile_function!(); debug_assert!( - ListArray::::get_child_type(rows_single.data_type()) == &self.datatype, + cell.datatype() == &self.datatype, "trying to insert data of the wrong datatype in a component table, \ expected {:?}, got {:?}", &self.datatype, - ListArray::::get_child_type(rows_single.data_type()), - ); - debug_assert!( - rows_single.len() == 1, - "batched row component insertions are not supported yet" + cell.datatype() ); // All component tables spawn with an initial bucket at row offset 0, thus this cannot @@ -1124,7 +1037,7 @@ impl ComponentTable { let active_bucket = self.buckets.back_mut().unwrap(); let row_idx = RowIndex::from_u63( RowIndexKind::Temporal, - active_bucket.push(time_point, rows_single) + active_bucket.row_offset, + active_bucket.push_cell(time_point, cell) + active_bucket.row_offset, ); trace!( @@ -1172,28 +1085,13 @@ impl ComponentBucket { } } - /// Pushes `rows_single` to the end of the bucket, returning the _local_ index of the + /// Pushes `cell` to the end of the bucket, returning the _local_ index of the /// freshly added row. - /// - /// `rows_single` must be a unit-length list of arrays of structs, - /// i.e. `ListArray`: - /// - the list layer corresponds to the different rows (always unit-length for now), - /// - the array layer corresponds to the different instances within that single row, - /// - and finally the struct layer holds the components themselves. - /// E.g.: - /// ```text - /// [[{x: 8.687487, y: 1.9590926}, {x: 2.0559108, y: 0.1494348}, {x: 7.09219, y: 0.9616637}]] - /// ``` - pub fn push(&mut self, time_point: &TimePoint, rows_single: &dyn Array) -> u64 { + pub fn push_cell(&mut self, timepoint: &TimePoint, cell: &DataCell) -> u64 { crate::profile_function!(); - debug_assert!( - rows_single.len() == 1, - "batched row component insertions are not supported yet" - ); - // Keep track of all affected time ranges, for garbage collection purposes. - for (timeline, &time) in time_point { + for (timeline, &time) in timepoint { self.time_ranges .entry(*timeline) .and_modify(|range| { @@ -1202,13 +1100,15 @@ impl ComponentBucket { .or_insert_with(|| TimeRange::new(time, time)); } + // TODO(cmc): don't use raw arrays + let values = cell.as_arrow_monolist(); + self.total_rows += 1; // Warning: this is surprisingly costly! - self.total_size_bytes += - arrow2::compute::aggregate::estimated_bytes_size(rows_single) as u64; + self.total_size_bytes += arrow2::compute::aggregate::estimated_bytes_size(&*values) as u64; // TODO(#589): support for non-unit-length chunks - self.chunks.push(rows_single.to_boxed()); // shallow + self.chunks.push(values); self.chunks.len() as u64 - 1 } diff --git a/crates/re_arrow_store/tests/correctness.rs b/crates/re_arrow_store/tests/correctness.rs index d5729e63763c..425f850c5b07 100644 --- a/crates/re_arrow_store/tests/correctness.rs +++ b/crates/re_arrow_store/tests/correctness.rs @@ -4,7 +4,6 @@ use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; -use arrow2::array::UInt64Array; use rand::Rng; use re_arrow_store::{ @@ -16,8 +15,8 @@ use re_log_types::{ build_frame_nr, build_log_time, build_some_colors, build_some_instances, build_some_point2d, }, external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, - msg_bundle::{wrap_in_listarray, Component as _, ComponentBundle}, - Duration, EntityPath, MsgId, Time, TimeType, Timeline, + msg_bundle::Component as _, + DataCell, Duration, EntityPath, MsgId, Time, TimeType, Timeline, }; // --- @@ -29,67 +28,8 @@ fn write_errors() { let ent_path = EntityPath::from("this/that"); { - use arrow2::compute::concatenate::concatenate; - - let mut store = DataStore::new(InstanceKey::name(), Default::default()); - let mut bundle = test_bundle!(ent_path @ - [build_frame_nr(32.into()), build_log_time(Time::now())] => [ - build_some_instances(10), build_some_point2d(10) - ]); - - // make instances 2 rows long - bundle.components[0] = ComponentBundle::new_from_boxed( - bundle.components[0].name(), - concatenate(&[ - bundle.components[0].value_list(), - bundle.components[0].value_list(), - ]) - .unwrap() - .as_ref(), - ); - - // The first component of the bundle determines the number of rows for all other - // components in there (since it has to match for all of them), so in this case we get a - // `MoreThanOneRow` error as the first component is > 1 row. - assert!(matches!( - store.insert(&bundle), - Err(WriteError::MoreThanOneRow(_)), - )); - } - - { - use arrow2::compute::concatenate::concatenate; - - let mut store = DataStore::new(InstanceKey::name(), Default::default()); - let mut bundle = test_bundle!(ent_path @ - [build_frame_nr(32.into()), build_log_time(Time::now())] => [ - build_some_instances(10), build_some_point2d(10) - ]); - - // make component 2 rows long - bundle.components[1] = ComponentBundle::new_from_boxed( - bundle.components[1].name(), - concatenate(&[ - bundle.components[1].value_list(), - bundle.components[1].value_list(), - ]) - .unwrap() - .as_ref(), - ); - - // The first component of the bundle determines the number of rows for all other - // components in there (since it has to match for all of them), so in this case we get a - // `MismatchedRows` error as the first component is 1 row but the 2nd isn't. - assert!(matches!( - store.insert(&bundle), - Err(WriteError::MismatchedRows(_)), - )); - } - - { - pub fn build_sparse_instances() -> ComponentBundle { - let ids = wrap_in_listarray(UInt64Array::from(vec![Some(1), None, Some(3)]).boxed()); - ComponentBundle::new(InstanceKey::name(), ids) + pub fn build_sparse_instances() -> DataCell { + DataCell::from_component_sparse::([Some(1), None, Some(3)]) } let mut store = DataStore::new(InstanceKey::name(), Default::default()); @@ -98,20 +38,18 @@ fn write_errors() { build_sparse_instances(), build_some_point2d(3) ]); assert!(matches!( - store.insert(&bundle), + store.insert_row(&bundle), Err(WriteError::SparseClusteringComponent(_)), )); } { - pub fn build_unsorted_instances() -> ComponentBundle { - let ids = wrap_in_listarray(UInt64Array::from_vec(vec![1, 3, 2]).boxed()); - ComponentBundle::new(InstanceKey::name(), ids) + pub fn build_unsorted_instances() -> DataCell { + DataCell::from_component::([1, 3, 2]) } - pub fn build_duped_instances() -> ComponentBundle { - let ids = wrap_in_listarray(UInt64Array::from_vec(vec![1, 2, 2]).boxed()); - ComponentBundle::new(InstanceKey::name(), ids) + pub fn build_duped_instances() -> DataCell { + DataCell::from_component::([1, 2, 2]) } let mut store = DataStore::new(InstanceKey::name(), Default::default()); @@ -121,7 +59,7 @@ fn write_errors() { build_unsorted_instances(), build_some_point2d(3) ]); assert!(matches!( - store.insert(&bundle), + store.insert_row(&bundle), Err(WriteError::InvalidClusteringComponent(_)), )); } @@ -131,7 +69,7 @@ fn write_errors() { build_duped_instances(), build_some_point2d(3) ]); assert!(matches!( - store.insert(&bundle), + store.insert_row(&bundle), Err(WriteError::InvalidClusteringComponent(_)), )); } @@ -144,7 +82,7 @@ fn write_errors() { build_some_instances(4), build_some_point2d(3) ]); assert!(matches!( - store.insert(&bundle), + store.insert_row(&bundle), Err(WriteError::MismatchedInstances { .. }), )); } @@ -172,7 +110,7 @@ fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) { let num_instances = 3; store - .insert( + .insert_row( &test_bundle!(ent_path @ [build_log_time(now), build_frame_nr(frame40)] => [ build_some_instances(num_instances), ]), @@ -311,7 +249,7 @@ fn range_join_across_single_row_impl(store: &mut DataStore) { let colors = build_some_colors(3); let bundle = test_bundle!(ent_path @ [build_frame_nr(42.into())] => [points.clone(), colors.clone()]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); let query = re_arrow_store::RangeQuery::new( @@ -376,7 +314,7 @@ fn gc_correct() { let bundle = test_bundle!(ent_path @ [build_frame_nr(frame_nr.into())] => [ build_some_colors(num_instances), ]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } } diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 3166d88c0fc0..e4eb6f1598bc 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -6,7 +6,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; -use arrow2::array::{Array, UInt64Array}; use nohash_hasher::IntMap; use polars_core::{prelude::*, series::Series}; use polars_ops::prelude::DataFrameJoinOps; @@ -22,8 +21,8 @@ use re_log_types::{ build_some_point2d, build_some_rects, }, external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, - msg_bundle::{wrap_in_listarray, Component as _, MsgBundle}, - ComponentName, EntityPath, MsgId, TimeType, Timeline, + msg_bundle::{Component as _, MsgBundle}, + ComponentName, DataCell, EntityPath, MsgId, TimeType, Timeline, }; // --- LatestComponentsAt --- @@ -96,19 +95,19 @@ fn all_components() { ]; let bundle = test_bundle!(ent_path @ [] => [build_some_colors(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let bundle = test_bundle!(ent_path @ [ build_frame_nr(frame1), ] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); let bundle = test_bundle!(ent_path @ [ build_frame_nr(frame2), ] => [build_some_rects(2), build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_b)); @@ -164,20 +163,20 @@ fn all_components() { ]; let bundle = test_bundle!(ent_path @ [] => [build_some_colors(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_instances(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_b)); @@ -236,25 +235,25 @@ fn all_components() { ]; let bundle = test_bundle!(ent_path @ [] => [build_some_colors(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_a)); let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_point2d(2)]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); assert_latest_components_at(&mut store, &ent_path, Some(components_b)); @@ -298,12 +297,12 @@ fn latest_at_impl(store: &mut DataStore) { // helper to insert a bundle both as a temporal and timeless payload let insert = |store: &mut DataStore, bundle| { // insert temporal - store.insert(bundle).unwrap(); + store.insert_row(bundle).unwrap(); // insert timeless let mut bundle_timeless = bundle.clone(); bundle_timeless.time_point = Default::default(); - store.insert(&bundle_timeless).unwrap(); + store.insert_row(&bundle_timeless).unwrap(); }; let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); @@ -403,12 +402,12 @@ fn range_impl(store: &mut DataStore) { // helper to insert a bundle both as a temporal and timeless payload let insert = |store: &mut DataStore, bundle| { // insert temporal - store.insert(bundle).unwrap(); + store.insert_row(bundle).unwrap(); // insert timeless let mut bundle_timeless = bundle.clone(); bundle_timeless.time_point = Default::default(); - store.insert(&bundle_timeless).unwrap(); + store.insert_row(&bundle_timeless).unwrap(); }; let insts1 = build_some_instances(3); @@ -865,21 +864,20 @@ fn range_impl(store: &mut DataStore) { // --- Common helpers --- /// Given a list of bundles, crafts a `latest_components`-looking dataframe. +// TODO(#1692): use Data{Cell,Row,Table} polars extensions fn joint_df(cluster_key: ComponentName, bundles: &[(ComponentName, &MsgBundle)]) -> DataFrame { let df = bundles .iter() .map(|(component, bundle)| { let cluster_comp = if let Some(idx) = bundle.find_component(&cluster_key) { - Series::try_from((cluster_key.as_str(), bundle.components[idx].value_boxed())) + Series::try_from((cluster_key.as_str(), bundle.cells[idx].as_arrow_monolist())) .unwrap() } else { - let num_instances = bundle.num_instances(0).unwrap_or(0); + let num_instances = bundle.num_instances(); Series::try_from(( cluster_key.as_str(), - wrap_in_listarray( - UInt64Array::from_vec((0..num_instances as u64).collect()).to_boxed(), - ) - .to_boxed(), + DataCell::from_component::(0..num_instances as u64) + .as_arrow_monolist(), )) .unwrap() }; @@ -889,7 +887,7 @@ fn joint_df(cluster_key: ComponentName, bundles: &[(ComponentName, &MsgBundle)]) cluster_comp, Series::try_from(( component.as_str(), - bundle.components[comp_idx].value_boxed(), + bundle.cells[comp_idx].as_arrow_monolist(), )) .unwrap(), ]) @@ -935,7 +933,7 @@ fn gc_impl(store: &mut DataStore) { let bundle = test_bundle!(ent_path @ [build_frame_nr(frame_nr.into())] => [ build_some_rects(num_instances), ]); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } } diff --git a/crates/re_arrow_store/tests/internals.rs b/crates/re_arrow_store/tests/internals.rs index c7e1e5367c08..a57438470e7e 100644 --- a/crates/re_arrow_store/tests/internals.rs +++ b/crates/re_arrow_store/tests/internals.rs @@ -59,7 +59,7 @@ fn pathological_bucket_topology() { time_point.clone(), vec![build_some_instances(num_instances).try_into().unwrap()], ); - store_forward.insert(&msg).unwrap(); + store_forward.insert_row(&msg).unwrap(); let msg = MsgBundle::new( MsgId::ZERO, @@ -67,7 +67,7 @@ fn pathological_bucket_topology() { time_point.clone(), vec![build_some_instances(num_instances).try_into().unwrap()], ); - store_backward.insert(&msg).unwrap(); + store_backward.insert_row(&msg).unwrap(); } } @@ -92,11 +92,11 @@ fn pathological_bucket_topology() { .collect::>(); msgs.iter() - .for_each(|msg| store_forward.insert(msg).unwrap()); + .for_each(|msg| store_forward.insert_row(msg).unwrap()); msgs.iter() .rev() - .for_each(|msg| store_backward.insert(msg).unwrap()); + .for_each(|msg| store_backward.insert_row(msg).unwrap()); } store_repeated_frame(1000, 10, &mut store_forward, &mut store_backward); diff --git a/crates/re_data_store/Cargo.toml b/crates/re_data_store/Cargo.toml index d290e7445d2d..f6b85076b5d7 100644 --- a/crates/re_data_store/Cargo.toml +++ b/crates/re_data_store/Cargo.toml @@ -34,7 +34,7 @@ re_string_interner.workspace = true ahash.workspace = true anyhow = "1.0" document-features = "0.2" -itertools = "0.10" +itertools = { workspace = true } nohash-hasher = "0.2" once_cell = "1.15.0" serde = { version = "1", features = ["derive", "rc"], optional = true } diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index ac7b5d6db37d..3dd5932e175d 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -4,9 +4,9 @@ use re_arrow_store::{DataStoreConfig, GarbageCollectionTarget, TimeInt}; use re_log_types::{ component_types::InstanceKey, external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, - msg_bundle::{Component as _, ComponentBundle, MsgBundle}, - ArrowMsg, BeginRecordingMsg, ComponentPath, EntityPath, EntityPathHash, EntityPathOpMsg, - LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint, Timeline, + msg_bundle::{Component as _, MsgBundle}, + ArrowMsg, BeginRecordingMsg, ComponentPath, DataCell, EntityPath, EntityPathHash, + EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint, Timeline, }; use crate::{Error, TimesPerTimeline}; @@ -85,10 +85,10 @@ impl EntityDb { self.register_entity_path(&msg_bundle.entity_path); - for component in &msg_bundle.components { + for cell in &msg_bundle.cells { let component_path = - ComponentPath::new(msg_bundle.entity_path.clone(), component.name()); - if component.name() == MsgId::name() { + ComponentPath::new(msg_bundle.entity_path.clone(), cell.component_name()); + if cell.component_name() == MsgId::name() { continue; } let pending_clears = self @@ -98,22 +98,22 @@ impl EntityDb { for (msg_id, time_point) in pending_clears { // Create and insert an empty component into the arrow store // TODO(jleibs): Faster empty-array creation - let bundle = - ComponentBundle::new_empty(component.name(), component.data_type().clone()); + let cell = + DataCell::from_arrow_empty(cell.component_name(), cell.datatype().clone()); let msg_bundle = MsgBundle::new( msg_id, msg_bundle.entity_path.clone(), time_point.clone(), - vec![bundle], + vec![cell], ); - self.data_store.insert(&msg_bundle).ok(); + self.data_store.insert_row(&msg_bundle).ok(); // Also update the tree with the clear-event self.tree.add_data_msg(&time_point, &component_path); } } - self.data_store.insert(&msg_bundle).map_err(Into::into) + self.data_store.insert_row(&msg_bundle).map_err(Into::into) } fn add_path_op(&mut self, msg_id: MsgId, time_point: &TimePoint, path_op: &PathOp) { @@ -126,15 +126,15 @@ impl EntityDb { { // Create and insert an empty component into the arrow store // TODO(jleibs): Faster empty-array creation - let bundle = - ComponentBundle::new_empty(component_path.component_name, data_type.clone()); + let cell = + DataCell::from_arrow_empty(component_path.component_name, data_type.clone()); let msg_bundle = MsgBundle::new( msg_id, component_path.entity_path.clone(), time_point.clone(), - vec![bundle], + vec![cell], ); - self.data_store.insert(&msg_bundle).ok(); + self.data_store.insert_row(&msg_bundle).ok(); // Also update the tree with the clear-event self.tree.add_data_msg(time_point, &component_path); } diff --git a/crates/re_log_types/Cargo.toml b/crates/re_log_types/Cargo.toml index 6bafe68784a3..59bb931e3712 100644 --- a/crates/re_log_types/Cargo.toml +++ b/crates/re_log_types/Cargo.toml @@ -66,6 +66,7 @@ bytemuck = "1.11" document-features = "0.2" fixed = { version = "1.17", default-features = false, features = ["serde"] } half = { workspace = true, features = ["bytemuck"] } +itertools = { workspace = true } lazy_static.workspace = true ndarray.workspace = true nohash-hasher = "0.2" diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index da0e1d7eee0b..9ea0dbc00c23 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -170,6 +170,10 @@ mod tests { ) .unwrap(); + // TODO(#1619): test the full roundtrip: + // cell -> row -> table_in -> msg_in -> msg_out -> table_out + // => msg_in == msg_out + // => table_in == table_out let msg_in: ArrowMsg = bundle.try_into().unwrap(); let buf = rmp_serde::to_vec(&msg_in).unwrap(); let msg_out: ArrowMsg = rmp_serde::from_slice(&buf).unwrap(); diff --git a/crates/re_log_types/src/component_types/instance_key.rs b/crates/re_log_types/src/component_types/instance_key.rs index 6c846db45884..9d0e8eb4eac5 100644 --- a/crates/re_log_types/src/component_types/instance_key.rs +++ b/crates/re_log_types/src/component_types/instance_key.rs @@ -70,6 +70,13 @@ impl std::fmt::Display for InstanceKey { } } +impl From for InstanceKey { + #[inline] + fn from(value: u64) -> Self { + Self(value) + } +} + impl Component for InstanceKey { #[inline] fn name() -> crate::ComponentName { diff --git a/crates/re_log_types/src/data_cell.rs b/crates/re_log_types/src/data_cell.rs new file mode 100644 index 000000000000..74e189a42b91 --- /dev/null +++ b/crates/re_log_types/src/data_cell.rs @@ -0,0 +1,448 @@ +use itertools::Itertools as _; + +use crate::{ + msg_bundle::{Component, DeserializableComponent, SerializableComponent}, + ComponentName, +}; + +// --- + +#[derive(thiserror::Error, Debug)] +pub enum DataCellError { + #[error("Unsupported datatype: {0:?}")] + UnsupportedDatatype(arrow2::datatypes::DataType), + + #[error("Could not serialize/deserialize component instances to/from Arrow: {0}")] + Arrow(#[from] arrow2::error::Error), +} + +pub type DataCellResult = ::std::result::Result; + +// --- + +/// A cell's worth of data, i.e. a uniform array of values for a given component type. +/// This is the leaf type in our data model. +/// +/// A `DataCell` can be constructed from either an iterable of native `Component`s or directly +/// from a slice of arrow data. +/// +/// Behind the scenes, a `DataCell` is backed by an erased arrow array living on the heap, which +/// is likely to point into a larger batch of contiguous memory that it shares with its peers. +/// Cloning a `DataCell` is thus cheap (shallow, ref-counted). +/// +/// ## Layout +/// +/// A cell is an array of component instances: `[C, C, C, ...]`. +/// +/// Consider this example: +/// ```ignore +/// let points: &[Point2D] = &[ +/// [10.0, 10.0].into(), +/// [20.0, 20.0].into(), +/// [30.0, 30.0].into(), +/// ]; +/// let cell = DataCell::from(points); +/// // Or, alternatively: +/// let cell = DataCell::from_component::([[10.0, 10.0], [20.0, 20.0], [30.0, 30.0]]); +/// ``` +/// +/// The cell's datatype is now a `StructArray`: +/// ```ignore +/// Struct([ +/// Field { name: "x", data_type: Float32, is_nullable: false, metadata: {} }, +/// Field { name: "y", data_type: Float32, is_nullable: false, metadata: {} }, +/// ]) +/// ``` +/// +/// Or, visualized as a cell within a larger table: +/// ```text +/// ┌──────────────────────────────────────────────────┐ +/// │ rerun.point2d │ +/// ╞══════════════════════════════════════════════════╡ +/// │ [{x: 10, y: 10}, {x: 20, y: 20}, {x: 30, y: 30}] │ +/// └──────────────────────────────────────────────────┘ +/// ``` +/// +/// ## Example +/// +/// ```rust +/// # use arrow2_convert::field::ArrowField as _; +/// # use itertools::Itertools as _; +/// # +/// # use re_log_types::{DataCell, msg_bundle::Component as _}; +/// # use re_log_types::component_types::Point2D; +/// # +/// let points: &[Point2D] = &[ +/// [10.0, 10.0].into(), +/// [20.0, 20.0].into(), +/// [30.0, 30.0].into(), +/// ]; +/// let _cell = DataCell::from(points); +/// +/// // Or, alternatively: +/// let cell = DataCell::from_component::([[10.0, 10.0], [20.0, 20.0], [30.0, 30.0]]); +/// +/// eprintln!("{:#?}", cell.datatype()); +/// eprintln!("{cell}"); +/// # +/// # assert_eq!(Point2D::name(), cell.component_name()); +/// # assert_eq!(3, cell.num_instances()); +/// # assert_eq!(cell.datatype(), &Point2D::data_type()); +/// # +/// # assert_eq!(points, cell.as_native().collect_vec().as_slice()); +/// ``` +/// +#[derive(Debug, Clone)] +pub struct DataCell { + /// Name of the component type used in this cell. + // + // TODO(#1696): Store this within the datatype itself. + pub(crate) name: ComponentName, + + /// A uniformly typed list of values for the given component type: `[C, C, C, ...]` + /// + /// Includes the data, its schema and probably soon the component metadata + /// (e.g. the `ComponentName`). + /// + /// Internally this is always stored as an erased arrow array to avoid bad surprises with + /// frequent boxing/unboxing down the line. + /// Internally, this is most likely a slice of another, larger array (batching!). + pub(crate) values: Box, +} + +// TODO(cmc): We should be able to build a cell from non-reference types. +// TODO(#1619): We shouldn't have to specify the component name separately, this should be +// part of the metadata by using an extension. +// TODO(#1696): Check that the array is indeed a leaf / component type when building a cell from an +// arrow payload. +impl DataCell { + /// Builds a new `DataCell` from a uniform iterable of native component values. + /// + /// Fails if the given iterable cannot be serialized to arrow, which should never happen when + /// using Rerun's builtin components. + #[inline] + pub fn try_from_native<'a, C: SerializableComponent>( + values: impl IntoIterator, + ) -> DataCellResult { + use arrow2_convert::serialize::TryIntoArrow; + Ok(Self::from_arrow( + C::name(), + TryIntoArrow::try_into_arrow(values.into_iter())?, + )) + } + + /// Builds a new `DataCell` from a uniform iterable of native component values. + /// + /// Fails if the given iterable cannot be serialized to arrow, which should never happen when + /// using Rerun's builtin components. + #[inline] + pub fn try_from_native_sparse<'a, C: SerializableComponent>( + values: impl IntoIterator>, + ) -> DataCellResult { + use arrow2_convert::serialize::TryIntoArrow; + Ok(Self::from_arrow( + C::name(), + TryIntoArrow::try_into_arrow(values.into_iter())?, + )) + } + + /// Builds a new `DataCell` from a uniform iterable of native component values. + /// + /// Panics if the given iterable cannot be serialized to arrow, which should never happen when + /// using Rerun's builtin components. + /// See [`Self::try_from_native`] for the fallible alternative. + #[inline] + pub fn from_native<'a, C: SerializableComponent>( + values: impl IntoIterator, + ) -> Self { + Self::try_from_native(values).unwrap() + } + + /// Builds a new `DataCell` from a uniform iterable of native component values. + /// + /// Panics if the given iterable cannot be serialized to arrow, which should never happen when + /// using Rerun's builtin components. + /// See [`Self::try_from_native`] for the fallible alternative. + #[inline] + pub fn from_native_sparse<'a, C: SerializableComponent>( + values: impl IntoIterator>, + ) -> Self { + Self::try_from_native_sparse(values).unwrap() + } + + /// Builds a cell from an iterable of items that can be turned into a [`Component`]. + /// + /// ⚠ Due to quirks in `arrow2-convert`, this requires consuming and collecting the passed-in + /// iterator into a vector first. + /// Prefer [`Self::from_native`] when performance matters. + pub fn from_component_sparse(values: impl IntoIterator>>) -> Self + where + C: SerializableComponent, + { + let values = values + .into_iter() + .map(|value| value.map(Into::into)) + .collect_vec(); + Self::from_native_sparse(values.iter()) + } + + /// Builds a cell from an iterable of items that can be turned into a [`Component`]. + /// + /// ⚠ Due to quirks in `arrow2-convert`, this requires consuming and collecting the passed-in + /// iterator into a vector first. + /// Prefer [`Self::from_native`] when performance matters. + pub fn from_component(values: impl IntoIterator>) -> Self + where + C: SerializableComponent, + { + let values = values.into_iter().map(Into::into).collect_vec(); + Self::from_native(values.iter()) + } + + /// Builds a new `DataCell` from an arrow array. + /// + /// Fails if the array is not a valid list of components. + #[inline] + pub fn try_from_arrow( + name: ComponentName, + values: Box, + ) -> DataCellResult { + Ok(Self { name, values }) + } + + /// Builds a new `DataCell` from an arrow array. + /// + /// Panics if the array is not a valid list of components. + /// See [`Self::try_from_arrow`] for the fallible alternative. + #[inline] + pub fn from_arrow(name: ComponentName, values: Box) -> Self { + Self::try_from_arrow(name, values).unwrap() + } + + // --- + + /// Builds an empty `DataCell` from a native component type. + // + // TODO(#1595): do keep in mind there's a future not too far away where components become a + // `(component, type)` tuple kinda thing. + #[inline] + pub fn from_native_empty() -> Self { + Self::from_arrow_empty(C::name(), C::data_type()) + } + + /// Builds an empty `DataCell` from an arrow datatype. + /// + /// Fails if the datatype is not a valid component type. + #[inline] + pub fn try_from_arrow_empty( + name: ComponentName, + datatype: arrow2::datatypes::DataType, + ) -> DataCellResult { + // TODO(cmc): check that it is indeed a component datatype + Ok(Self { + name, + values: arrow2::array::new_empty_array(datatype), + }) + } + + /// Builds an empty `DataCell` from an arrow datatype. + /// + /// Panics if the datatype is not a valid component type. + /// See [`Self::try_from_arrow_empty`] for a fallible alternative. + #[inline] + pub fn from_arrow_empty(name: ComponentName, datatype: arrow2::datatypes::DataType) -> Self { + Self::try_from_arrow_empty(name, datatype).unwrap() + } + + // --- + + /// Returns the contents of the cell as an arrow array (shallow clone). + /// + /// Avoid using raw arrow arrays unless you absolutely have to: prefer working directly with + /// `DataCell`s, `DataRow`s & `DataTable`s instead. + /// If you do use them, try to keep the scope as short as possible: holding on to a raw array + /// might prevent the datastore from releasing memory from garbage collected data. + #[inline] + pub fn as_arrow(&self) -> Box { + self.values.clone() /* shallow */ + } + + /// Returns the contents of the cell as a reference to an arrow array. + /// + /// Avoid using raw arrow arrays unless you absolutely have to: prefer working directly with + /// `DataCell`s, `DataRow`s & `DataTable`s instead. + /// If you do use them, try to keep the scope as short as possible: holding on to a raw array + /// might prevent the datastore from releasing memory from garbage collected data. + #[inline] + pub fn as_arrow_ref(&self) -> &dyn arrow2::array::Array { + &*self.values + } + + /// Returns the contents of the cell as an arrow array (shallow clone) wrapped in a unit-length + /// list-array. + /// + /// Useful when dealing with cells of different lengths in context that don't allow for it. + /// + /// * Before: `[C, C, C, ...]` + /// * After: `ListArray[ [C, C, C, C] ]` + // + // TODO(#1696): this shouldn't be public, need to make it private once the store has been + // patched to use datacells directly. + // TODO(cmc): effectively, this returns a `DataColumn`... think about that. + #[doc(hidden)] + #[inline] + pub fn as_arrow_monolist(&self) -> Box { + use arrow2::{array::ListArray, offset::Offsets}; + + let values = self.as_arrow(); + let datatype = self.datatype().clone(); + + let datatype = ListArray::::default_datatype(datatype); + let offsets = Offsets::try_from_lengths(std::iter::once(self.num_instances() as usize)) + .unwrap() + .into(); + let validity = None; + + ListArray::::new(datatype, offsets, values, validity).boxed() + } + + /// Returns the contents of the cell as an iterator of native components. + /// + /// Fails if the underlying arrow data cannot be deserialized into `C`. + // + // TODO(#1694): There shouldn't need to be HRTBs (Higher-Rank Trait Bounds) here. + #[inline] + pub fn try_as_native( + &self, + ) -> DataCellResult + '_> + where + for<'a> &'a C::ArrayType: IntoIterator, + { + use arrow2_convert::deserialize::arrow_array_deserialize_iterator; + arrow_array_deserialize_iterator(&*self.values).map_err(Into::into) + } + + /// Returns the contents of the cell as an iterator of native components. + /// + /// Panics if the underlying arrow data cannot be deserialized into `C`. + /// See [`Self::try_as_native`] for a fallible alternative. + // + // TODO(#1694): There shouldn't need to be HRTBs here. + #[inline] + pub fn as_native(&self) -> impl Iterator + '_ + where + for<'a> &'a C::ArrayType: IntoIterator, + { + self.try_as_native().unwrap() + } +} + +impl DataCell { + /// The name of the component type stored in the cell. + #[inline] + pub fn component_name(&self) -> ComponentName { + self.name + } + + /// The type of the component stored in the cell, i.e. the cell is an array of that type. + #[inline] + pub fn datatype(&self) -> &arrow2::datatypes::DataType { + self.values.data_type() + } + + /// The length of the cell's array, i.e. how many component instances are in the cell? + #[inline] + pub fn num_instances(&self) -> u32 { + self.values.len() as _ + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.values.is_empty() + } + + /// Returns `true` if the underlying array is dense (no nulls). + #[inline] + pub fn is_dense(&self) -> bool { + if let Some(validity) = self.as_arrow_ref().validity() { + validity.unset_bits() == 0 + } else { + true + } + } + + /// Returns `true` if the underlying array is both sorted (increasing order) and contains only + /// unique values. + /// + /// The cell must be dense, otherwise the result of this method is undefined. + pub fn is_sorted_and_unique(&self) -> DataCellResult { + use arrow2::{ + array::{Array, PrimitiveArray}, + datatypes::DataType, + types::NativeType, + }; + + debug_assert!(self.is_dense()); + + let arr = self.as_arrow_ref(); + + fn is_sorted_and_unique_primitive(arr: &dyn Array) -> bool { + // NOTE: unwrap cannot fail, checked by caller just below + let values = arr.as_any().downcast_ref::>().unwrap(); + values.values().windows(2).all(|v| v[0] < v[1]) + } + + // TODO(cmc): support more datatypes as the need arise. + match arr.data_type() { + DataType::Int8 => Ok(is_sorted_and_unique_primitive::(arr)), + DataType::Int16 => Ok(is_sorted_and_unique_primitive::(arr)), + DataType::Int32 => Ok(is_sorted_and_unique_primitive::(arr)), + DataType::Int64 => Ok(is_sorted_and_unique_primitive::(arr)), + DataType::UInt8 => Ok(is_sorted_and_unique_primitive::(arr)), + DataType::UInt16 => Ok(is_sorted_and_unique_primitive::(arr)), + DataType::UInt32 => Ok(is_sorted_and_unique_primitive::(arr)), + DataType::UInt64 => Ok(is_sorted_and_unique_primitive::(arr)), + DataType::Float32 => Ok(is_sorted_and_unique_primitive::(arr)), + DataType::Float64 => Ok(is_sorted_and_unique_primitive::(arr)), + _ => Err(DataCellError::UnsupportedDatatype(arr.data_type().clone())), + } + } +} + +// --- + +// TODO(#1693): this should be `C: Component`, nothing else. + +impl From<&[C]> for DataCell { + #[inline] + fn from(values: &[C]) -> Self { + Self::from_native(values.iter()) + } +} + +impl From> for DataCell { + #[inline] + fn from(c: Vec) -> Self { + c.as_slice().into() + } +} + +impl From<&Vec> for DataCell { + #[inline] + fn from(c: &Vec) -> Self { + c.as_slice().into() + } +} + +// --- + +impl std::fmt::Display for DataCell { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + re_format::arrow::format_table( + // NOTE: wrap in a ListArray so that it looks more cell-like (i.e. single row) + [&*self.as_arrow_monolist()], + [self.component_name()], + ) + .fmt(f) + } +} diff --git a/crates/re_log_types/src/datagen.rs b/crates/re_log_types/src/datagen.rs index d88dbbb0888c..153545bc05d8 100644 --- a/crates/re_log_types/src/datagen.rs +++ b/crates/re_log_types/src/datagen.rs @@ -1,5 +1,7 @@ //! Generate random data for tests and benchmarks. +// TODO(#1619): It really is time for whole module to disappear. + use crate::{ component_types::{self, InstanceKey}, Time, TimeInt, TimeType, Timeline, diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index 6da17dc35596..72c8779e9092 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -12,8 +12,8 @@ pub mod datagen; pub mod arrow_msg; pub mod component_types; -pub use arrow_msg::ArrowMsg; mod data; +mod data_cell; pub mod hash; mod index; pub mod msg_bundle; @@ -34,6 +34,7 @@ pub mod external { pub use image; } +pub use self::arrow_msg::ArrowMsg; pub use self::component_types::context; pub use self::component_types::coordinates; pub use self::component_types::AnnotationContext; @@ -42,6 +43,7 @@ pub use self::component_types::MsgId; pub use self::component_types::ViewCoordinates; pub use self::component_types::{EncodedMesh3D, Mesh3D, MeshFormat, MeshId, RawMesh3D}; pub use self::data::*; +pub use self::data_cell::{DataCell, DataCellError, DataCellResult}; pub use self::index::*; pub use self::path::*; pub use self::time::{Duration, Time}; diff --git a/crates/re_log_types/src/msg_bundle.rs b/crates/re_log_types/src/msg_bundle.rs index 05ea11410ed0..9092a561b345 100644 --- a/crates/re_log_types/src/msg_bundle.rs +++ b/crates/re_log_types/src/msg_bundle.rs @@ -21,7 +21,7 @@ use std::collections::BTreeMap; use arrow2::{ - array::{new_empty_array, Array, ListArray, StructArray}, + array::{Array, ListArray, StructArray}, chunk::Chunk, datatypes::{DataType, Field, Schema}, offset::Offsets, @@ -32,6 +32,13 @@ use arrow2_convert::{ serialize::{ArrowSerialize, TryIntoArrow}, }; +use crate::{ + parse_entity_path, ArrowMsg, ComponentName, DataCell, DataCellError, EntityPath, MsgId, + PathParseError, TimePoint, +}; + +// --- + /// The errors that can occur when trying to convert between Arrow and `MessageBundle` types #[derive(thiserror::Error, Debug)] pub enum MsgBundleError { @@ -59,6 +66,9 @@ pub enum MsgBundleError { #[error("Could not serialize components to Arrow")] ArrowSerializationError(#[from] arrow2::error::Error), + #[error(transparent)] + DataCell(#[from] DataCellError), + // Needed to handle TryFrom -> T #[error("Infallible")] Unreachable(#[from] std::convert::Infallible), @@ -66,9 +76,7 @@ pub enum MsgBundleError { pub type Result = std::result::Result; -use crate::{ - parse_entity_path, ArrowMsg, ComponentName, EntityPath, MsgId, PathParseError, TimePoint, -}; +// --- //TODO(john) get rid of this eventually const ENTITY_PATH_KEY: &str = "RERUN:entity_path"; @@ -76,6 +84,8 @@ const ENTITY_PATH_KEY: &str = "RERUN:entity_path"; const COL_COMPONENTS: &str = "components"; const COL_TIMELINES: &str = "timelines"; +// TODO(#1619): why is Component declared here? + /// A type that can used as a Component of an Entity. /// /// Examples of components include positions and colors. @@ -89,6 +99,9 @@ pub trait Component: ArrowField { } } +// TODO(#1694): do a pass over these traits, this is incomprehensible... also why would a component +// ever not be (de)serializable? Do keep in mind the whole (component, datatype) story though. + /// A [`Component`] that fulfils all the conditions required to be serialized as an Arrow payload. pub trait SerializableComponent where @@ -129,192 +142,20 @@ where { } -/// A [`ComponentBundle`] holds an Arrow component column, and its field name. -/// -/// A [`ComponentBundle`] can be created from a collection of any element that implements the -/// [`Component`] and [`ArrowSerialize`] traits. -/// -/// # Example -/// -/// ``` -/// # use re_log_types::{component_types::Point2D, msg_bundle::ComponentBundle}; -/// let points = vec![Point2D { x: 0.0, y: 1.0 }]; -/// let bundle = ComponentBundle::try_from(points).unwrap(); -/// ``` -#[derive(Debug, Clone)] -pub struct ComponentBundle { - /// The name of the Component, used as column name in the table `Field`. - name: ComponentName, - - /// The Component payload `Array`. - value: ListArray, -} - -impl ComponentBundle { - #[inline] - pub fn new_empty(name: ComponentName, data_type: DataType) -> Self { - Self { - name, - value: wrap_in_listarray(new_empty_array(data_type)), - } - } - - #[inline] - pub fn new(name: ComponentName, value: ListArray) -> Self { - Self { name, value } - } - - /// Create a new `ComponentBundle` from a boxed `Array`. The `Array` must be a `ListArray`. - #[inline] - pub fn new_from_boxed(name: ComponentName, value: &dyn Array) -> Self { - Self { - name, - value: value - .as_any() - .downcast_ref::>() - .unwrap() - .clone(), - } - } - - /// Returns the datatype of the bundled component, discarding the list array that wraps it (!). - #[inline] - pub fn data_type(&self) -> &DataType { - ListArray::::get_child_type(self.value.data_type()) - } - - #[inline] - pub fn name(&self) -> ComponentName { - self.name - } - - /// Get the `ComponentBundle` value as a boxed `Array`. - #[inline] - pub fn value_boxed(&self) -> Box { - self.value.to_boxed() - } - - /// Get the `ComponentBundle` value - #[inline] - pub fn value_list(&self) -> &ListArray { - &self.value - } - - /// Returns the number of _rows_ in this bundle, i.e. the length of the bundle. - /// - /// Currently always 1 as we don't yet support batch insertions. - #[inline] - pub fn num_rows(&self) -> usize { - self.value.len() - } - - /// Returns the number of _instances_ for a given `row` in the bundle, i.e. the length of a - /// specific row within the bundle. - #[inline] - pub fn num_instances(&self, row: usize) -> Option { - self.value.offsets().lengths().nth(row) - } -} - -impl TryFrom<&[C]> for ComponentBundle { - type Error = MsgBundleError; - - fn try_from(c: &[C]) -> Result { - let array: Box = TryIntoArrow::try_into_arrow(c)?; - let wrapped = wrap_in_listarray(array); - Ok(ComponentBundle::new(C::name(), wrapped)) - } -} - -impl TryFrom> for ComponentBundle { - type Error = MsgBundleError; - - fn try_from(c: Vec) -> Result { - c.as_slice().try_into() - } -} - -impl TryFrom<&Vec> for ComponentBundle { - type Error = MsgBundleError; - - fn try_from(c: &Vec) -> Result { - c.as_slice().try_into() - } -} - -// TODO(cmc): We'd like this, but orphan rules prevent us from having it: -// -// ``` -// = note: conflicting implementation in crate `core`: -// - impl std::convert::TryFrom for T -// where U: std::convert::Into; -// ``` -// -// impl<'a, C: SerializableComponent, I: IntoIterator> TryFrom for ComponentBundle { -// type Error = MsgBundleError; - -// fn try_from(c: I) -> Result { -// c.as_slice().try_into() -// } -// } +// --- /// A `MsgBundle` holds data necessary for composing a single log message. -/// -/// # Example -/// -/// Create a `MsgBundle` and add a component consisting of 2 [`crate::component_types::Rect2D`] values: -/// ``` -/// # use re_log_types::{component_types::Rect2D, msg_bundle::MsgBundle, MsgId, EntityPath, TimePoint}; -/// let component = vec![ -/// Rect2D::from_xywh(0.0, 0.0, 0.0, 0.0), -/// Rect2D::from_xywh(1.0, 1.0, 0.0, 0.0) -/// ]; -/// let mut bundle = MsgBundle::new(MsgId::ZERO, EntityPath::root(), TimePoint::default(), vec![]); -/// bundle.try_append_component(&component).unwrap(); -/// println!("{:?}", &bundle.components[0].value_boxed()); -/// ``` -/// -/// The resultant Arrow [`arrow2::array::Array`] for the `Rect2D` component looks as follows: -/// ```text -/// ┌─────────────────┬──────────────────────────────┐ -/// │ rerun.msg_id ┆ rerun.rect2d │ -/// │ --- ┆ --- │ -/// │ list[struct[2]] ┆ list[union[6]] │ -/// ╞═════════════════╪══════════════════════════════╡ -/// │ [] ┆ [[0, 0, 0, 0], [1, 1, 0, 0]] │ -/// └─────────────────┴──────────────────────────────┘ -/// ``` -/// The `MsgBundle` can then also be converted into an [`crate::arrow_msg::ArrowMsg`]: -/// -/// ``` -/// # use re_log_types::{ArrowMsg, component_types::Rect2D, msg_bundle::MsgBundle, MsgId, EntityPath, TimePoint}; -/// # let mut bundle = MsgBundle::new(MsgId::ZERO, EntityPath::root(), TimePoint::default(), vec![]); -/// # bundle.try_append_component(re_log_types::datagen::build_some_rects(2).iter()).unwrap(); -/// let msg: ArrowMsg = bundle.try_into().unwrap(); -/// dbg!(&msg); -/// ``` -/// -/// And the resulting Arrow [`arrow2::array::Array`] in the [`ArrowMsg`] looks as follows: -/// ```text -/// ┌─────────────────┬────────────────────────────────────────────────────────────────┐ -/// │ timelines ┆ components │ -/// │ --- ┆ --- │ -/// │ list[struct[3]] ┆ struct[2] │ -/// ╞═════════════════╪════════════════════════════════════════════════════════════════╡ -/// │ [] ┆ {rerun.msg_id: [], rerun.rect2d: [[0, 0, 0, 0], [1, 1, 0, 0]]} │ -/// └─────────────────┴────────────────────────────────────────────────────────────────┘ -/// ``` #[derive(Clone, Debug)] pub struct MsgBundle { /// A unique id per [`crate::LogMsg`]. pub msg_id: MsgId, pub entity_path: EntityPath, pub time_point: TimePoint, - pub components: Vec, + pub cells: Vec, } impl MsgBundle { - /// Create a new `MsgBundle` with a pre-built Vec of [`ComponentBundle`] components. + /// Create a new `MsgBundle` with a pre-built Vec of [`DataCell`] components. /// /// The `MsgId` will automatically be appended as a component to the given `bundles`, allowing /// the backend to keep track of the origin of any row of data. @@ -322,63 +163,29 @@ impl MsgBundle { msg_id: MsgId, entity_path: EntityPath, time_point: TimePoint, - components: Vec, + components: Vec, ) -> Self { let mut this = Self { msg_id, entity_path, time_point, - components, + cells: components, }; // TODO(cmc): Since we don't yet support mixing splatted data within instanced rows, // we need to craft an array of `MsgId`s that matches the length of the other components. - if let Some(num_instances) = this.num_instances(0) { - this.try_append_component(&vec![msg_id; num_instances]) - .unwrap(); - } + this.cells.push(DataCell::from_native( + vec![msg_id; this.num_instances()].iter(), + )); this } - /// Try to append a collection of `Component` onto the `MessageBundle`. - /// - /// This first converts the component collection into an Arrow array, and then wraps it in a [`ListArray`]. - pub fn try_append_component<'a, Component, Collection>( - &mut self, - component: Collection, - ) -> Result<()> - where - Component: SerializableComponent, - Collection: IntoIterator, - { - let array: Box = TryIntoArrow::try_into_arrow(component)?; - let wrapped = wrap_in_listarray(array); - - let bundle = ComponentBundle::new(Component::name(), wrapped); - - self.components.push(bundle); - Ok(()) - } - /// Returns the number of component collections in this bundle, i.e. the length of the bundle /// itself. #[inline] pub fn num_components(&self) -> usize { - self.components.len() - } - - /// Returns the number of _rows_ for each component collections in this bundle, i.e. the - /// length of each component collections. - /// - /// All component collections within a `MsgBundle` must share the same number of rows! - /// - /// Currently always 1 as we don't yet support batch insertions. - #[inline] - pub fn num_rows(&self) -> usize { - self.components - .first() - .map_or(0, |bundle| bundle.num_rows()) + self.cells.len() } /// Returns the number of _instances_ for a given `row` in the bundle, i.e. the length of a @@ -388,10 +195,10 @@ impl MsgBundle { /// have the same number of instances, we simply pick the value for the first component /// collection. #[inline] - pub fn num_instances(&self, row: usize) -> Option { - self.components + pub fn num_instances(&self) -> usize { + self.cells .first() - .map_or(Some(0), |bundle| bundle.num_instances(row)) + .map_or(0, |cell| cell.num_instances() as _) } /// Returns the index of `component` in the bundle, if it exists. @@ -399,17 +206,17 @@ impl MsgBundle { /// This is `O(n)`. #[inline] pub fn find_component(&self, component: &ComponentName) -> Option { - self.components + self.cells .iter() - .map(|bundle| bundle.name) + .map(|cell| cell.component_name()) .position(|name| name == *component) } } impl std::fmt::Display for MsgBundle { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let values = self.components.iter().map(|bundle| bundle.value_boxed()); - let names = self.components.iter().map(|bundle| bundle.name.as_str()); + let values = self.cells.iter().map(|cell| cell.as_arrow_ref()); + let names = self.cells.iter().map(|cell| cell.component_name().as_str()); let table = re_format::arrow::format_table(values, names); f.write_fmt(format_args!( "MsgBundle '{}' @ {:?}:\n{table}", @@ -418,18 +225,21 @@ impl std::fmt::Display for MsgBundle { } } -/// Pack the passed iterator of `ComponentBundle` into a `(Schema, StructArray)` tuple. +/// Pack the passed iterator of [`DataCell`] into a `(Schema, StructArray)` tuple. #[inline] -fn pack_components(components: impl Iterator) -> (Schema, StructArray) { - let (component_fields, component_cols): (Vec, Vec>) = components - .map(|bundle| { - let ComponentBundle { - name, - value: component, - } = bundle; +fn pack_components(cells: impl Iterator) -> (Schema, StructArray) { + let (component_fields, component_cols): (Vec, Vec>) = cells + .map(|cell| { + // NOTE: wrap in a ListArray to emulate the presence of rows, this'll go away with + // batching. + let data = cell.as_arrow_monolist(); ( - Field::new(name.as_str(), component.data_type().clone(), false), - component.to_boxed(), + Field::new( + cell.component_name().as_str(), + data.data_type().clone(), + false, + ), + data, ) }) .unzip(); @@ -438,11 +248,9 @@ fn pack_components(components: impl Iterator) -> (Schema let packed = StructArray::new(data_type, component_cols, None); let schema = Schema { - fields: [Field::new( - COL_COMPONENTS, - packed.data_type().clone(), - false, - )] + fields: [ + Field::new(COL_COMPONENTS, packed.data_type().clone(), false), // + ] .to_vec(), ..Default::default() }; @@ -476,7 +284,7 @@ impl TryFrom<&ArrowMsg> for MsgBundle { msg_id: *msg_id, entity_path: entity_path_cmp.into(), time_point, - components, + cells: components, }) } } @@ -501,7 +309,7 @@ impl TryFrom for ArrowMsg { cols.push(timelines_col); // Build & pack components - let (components_schema, components_data) = pack_components(bundle.components.into_iter()); + let (components_schema, components_data) = pack_components(bundle.cells.into_iter()); schema.fields.extend(components_schema.fields); schema.metadata.extend(components_schema.metadata); @@ -543,12 +351,9 @@ pub fn extract_timelines(schema: &Schema, chunk: &Chunk>) -> Resu Ok(timepoint) } -/// Extract a vector of `ComponentBundle` from the message. This is necessary since the +/// Extract a vector of `DataCell` from the message. This is necessary since the /// "components" schema is flexible. -fn extract_components( - schema: &Schema, - msg: &Chunk>, -) -> Result> { +fn extract_components(schema: &Schema, msg: &Chunk>) -> Result> { let components = schema .fields .iter() @@ -566,10 +371,14 @@ fn extract_components( .iter() .zip(components.values()) .map(|(field, component)| { - ComponentBundle::new_from_boxed( - ComponentName::from(field.name.as_str()), - component.as_ref(), - ) + // NOTE: unwrap the ListArray layer that we added during packing in order to emulate + // the presence of rows, this'll go away with batching. + let component = component + .as_any() + .downcast_ref::>() + .unwrap() + .values(); + DataCell::from_arrow(ComponentName::from(field.name.as_str()), component.clone()) }) .collect()) } @@ -592,19 +401,19 @@ pub fn try_build_msg_bundle1( msg_id: MsgId, into_entity_path: O, into_time_point: T, - into_bundles: C0, + into_cells: C0, ) -> Result where O: Into, T: Into, - C0: TryInto, - MsgBundleError: From<>::Error>, + C0: TryInto, + MsgBundleError: From<>::Error>, { Ok(MsgBundle::new( msg_id, into_entity_path.into(), into_time_point.into(), - vec![into_bundles.try_into()?], + vec![into_cells.try_into()?], )) } @@ -613,21 +422,21 @@ pub fn try_build_msg_bundle2( msg_id: MsgId, into_entity_path: O, into_time_point: T, - into_bundles: (C0, C1), + into_cells: (C0, C1), ) -> Result where O: Into, T: Into, - C0: TryInto, - C1: TryInto, - MsgBundleError: From<>::Error>, - MsgBundleError: From<>::Error>, + C0: TryInto, + C1: TryInto, + MsgBundleError: From<>::Error>, + MsgBundleError: From<>::Error>, { Ok(MsgBundle::new( msg_id, into_entity_path.into(), into_time_point.into(), - vec![into_bundles.0.try_into()?, into_bundles.1.try_into()?], + vec![into_cells.0.try_into()?, into_cells.1.try_into()?], )) } @@ -636,26 +445,26 @@ pub fn try_build_msg_bundle3( msg_id: MsgId, into_entity_path: O, into_time_point: T, - into_bundles: (C0, C1, C2), + into_cells: (C0, C1, C2), ) -> Result where O: Into, T: Into, - C0: TryInto, - C1: TryInto, - C2: TryInto, - MsgBundleError: From<>::Error>, - MsgBundleError: From<>::Error>, - MsgBundleError: From<>::Error>, + C0: TryInto, + C1: TryInto, + C2: TryInto, + MsgBundleError: From<>::Error>, + MsgBundleError: From<>::Error>, + MsgBundleError: From<>::Error>, { Ok(MsgBundle::new( msg_id, into_entity_path.into(), into_time_point.into(), vec![ - into_bundles.0.try_into()?, - into_bundles.1.try_into()?, - into_bundles.2.try_into()?, + into_cells.0.try_into()?, + into_cells.1.try_into()?, + into_cells.2.try_into()?, ], )) } diff --git a/crates/re_memory/Cargo.toml b/crates/re_memory/Cargo.toml index 9e9a765140d9..bd4f33c96b79 100644 --- a/crates/re_memory/Cargo.toml +++ b/crates/re_memory/Cargo.toml @@ -23,7 +23,7 @@ re_log.workspace = true ahash.workspace = true emath.workspace = true instant = { version = "0.1", features = ["wasm-bindgen"] } -itertools = "0.10" +itertools = { workspace = true } nohash-hasher = "0.2" once_cell = "1.16" parking_lot.workspace = true diff --git a/crates/re_query/Cargo.toml b/crates/re_query/Cargo.toml index bc0aa04c0781..26e35e3ce0d6 100644 --- a/crates/re_query/Cargo.toml +++ b/crates/re_query/Cargo.toml @@ -39,7 +39,7 @@ arrow2 = { workspace = true, features = [ ] } document-features = "0.2" indent = "0.1" -itertools = "0.10" +itertools = { workspace = true } nohash-hasher = "0.2" thiserror.workspace = true @@ -53,7 +53,7 @@ polars-core = { workspace = true, optional = true, features = [ [dev-dependencies] criterion = "0.4" -itertools = "0.10" +itertools = { workspace = true } mimalloc.workspace = true polars-core = { workspace = true, features = [ "dtype-date", diff --git a/crates/re_query/benches/query_benchmark.rs b/crates/re_query/benches/query_benchmark.rs index d8ae38287e18..3dbeec55d6bd 100644 --- a/crates/re_query/benches/query_benchmark.rs +++ b/crates/re_query/benches/query_benchmark.rs @@ -155,7 +155,7 @@ fn build_vecs_messages(paths: &[EntityPath], pts: usize) -> Vec { fn insert_messages<'a>(msgs: impl Iterator) -> DataStore { let mut store = DataStore::new(InstanceKey::name(), Default::default()); - msgs.for_each(|msg_bundle| store.insert(msg_bundle).unwrap()); + msgs.for_each(|msg_bundle| store.insert_row(msg_bundle).unwrap()); store } diff --git a/crates/re_query/examples/range.rs b/crates/re_query/examples/range.rs index 027bf0df99b8..3f7599132ef8 100644 --- a/crates/re_query/examples/range.rs +++ b/crates/re_query/examples/range.rs @@ -25,27 +25,27 @@ fn main() { let rects = build_some_rects(2); let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame1, &rects).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let points = build_some_point2d(2); let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame2, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let points = build_some_point2d(4); let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame3, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let rects = build_some_rects(3); let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &rects).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let points = build_some_point2d(3); let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let rects = build_some_rects(3); let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), frame4, &rects).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let query = RangeQuery::new(frame2[0].0, TimeRange::new(frame2[0].1, frame4[0].1)); diff --git a/crates/re_query/src/entity_view.rs b/crates/re_query/src/entity_view.rs index 4dd67ec96235..0d4cb9113628 100644 --- a/crates/re_query/src/entity_view.rs +++ b/crates/re_query/src/entity_view.rs @@ -1,7 +1,6 @@ use std::{collections::BTreeMap, marker::PhantomData}; use arrow2::array::{Array, MutableArray, PrimitiveArray}; -use re_arrow_store::ArrayExt; use re_format::arrow; use re_log_types::{ component_types::InstanceKey, @@ -102,9 +101,6 @@ impl ComponentWithInstances { // If `instance_keys` is set, extract the `PrimitiveArray`, and find // the index of the value by `binary_search` - // The store should guarantee this for us but assert to be sure - debug_assert!(instance_keys.is_sorted_and_unique().unwrap_or(false)); - let keys = instance_keys .as_any() .downcast_ref::>()? diff --git a/crates/re_query/src/query.rs b/crates/re_query/src/query.rs index 990ac52c0d81..e1146609f6c8 100644 --- a/crates/re_query/src/query.rs +++ b/crates/re_query/src/query.rs @@ -164,13 +164,13 @@ pub fn __populate_example_store() -> DataStore { let bundle = try_build_msg_bundle2(MsgId::ZERO, ent_path, timepoint, (&instances, &points)).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); let instances = vec![InstanceKey(96)]; let colors = vec![ColorRGBA(0xff000000)]; let bundle = try_build_msg_bundle2(MsgId::ZERO, ent_path, timepoint, (instances, colors)).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); store } diff --git a/crates/re_query/tests/query_tests.rs b/crates/re_query/tests/query_tests.rs index 1f65b8cf145d..8fc96583602f 100644 --- a/crates/re_query/tests/query_tests.rs +++ b/crates/re_query/tests/query_tests.rs @@ -22,7 +22,7 @@ fn simple_query() { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; @@ -34,7 +34,7 @@ fn simple_query() { (color_instances, colors), ) .unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -93,14 +93,14 @@ fn timeless_query() { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Assign one of them a color with an explicit instance.. timelessly! let color_instances = vec![InstanceKey(1)]; let colors = vec![ColorRGBA(0xff000000)]; let bundle = try_build_msg_bundle2(MsgId::random(), ent_path, [], (color_instances, colors)).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -159,12 +159,12 @@ fn no_instance_join_query() { // Create some points with an implicit instance let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Assign them colors with explicit instances let colors = vec![ColorRGBA(0xff000000), ColorRGBA(0x00ff0000)]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &colors).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -223,7 +223,7 @@ fn missing_column_join_query() { // Create some points with an implicit instance let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); @@ -281,7 +281,7 @@ fn splatted_query() { // Create some points with implicit instances let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path, timepoint, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Assign all of them a color via splat let color_instances = vec![InstanceKey::SPLAT]; @@ -293,7 +293,7 @@ fn splatted_query() { (color_instances, colors), ) .unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Retrieve the view let timeline_query = re_arrow_store::LatestAtQuery::new(timepoint[0].0, timepoint[0].1); diff --git a/crates/re_query/tests/range_tests.rs b/crates/re_query/tests/range_tests.rs index 315f40735b5c..f8b43dac0c15 100644 --- a/crates/re_query/tests/range_tests.rs +++ b/crates/re_query/tests/range_tests.rs @@ -24,7 +24,7 @@ fn simple_range() { let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint1, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; @@ -36,7 +36,7 @@ fn simple_range() { (color_instances, colors), ) .unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } let timepoint2 = [build_frame_nr(223.into())]; @@ -51,7 +51,7 @@ fn simple_range() { (color_instances, colors), ) .unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } let timepoint3 = [build_frame_nr(323.into())]; @@ -60,7 +60,7 @@ fn simple_range() { let points = vec![Point2D { x: 10.0, y: 20.0 }, Point2D { x: 30.0, y: 40.0 }]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint3, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } // --- First test: `(timepoint1, timepoint3]` --- @@ -244,11 +244,11 @@ fn timeless_range() { let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint1, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Insert timelessly too! let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), [], &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; @@ -260,7 +260,7 @@ fn timeless_range() { (color_instances.clone(), colors.clone()), ) .unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Insert timelessly too! let bundle = try_build_msg_bundle2( @@ -270,7 +270,7 @@ fn timeless_range() { (color_instances, colors), ) .unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } let timepoint2 = [build_frame_nr(223.into())]; @@ -285,7 +285,7 @@ fn timeless_range() { (color_instances.clone(), colors.clone()), ) .unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Insert timelessly too! let bundle = try_build_msg_bundle2( @@ -295,7 +295,7 @@ fn timeless_range() { (color_instances, colors), ) .unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } let timepoint3 = [build_frame_nr(323.into())]; @@ -304,11 +304,11 @@ fn timeless_range() { let points = vec![Point2D { x: 10.0, y: 20.0 }, Point2D { x: 30.0, y: 40.0 }]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint3, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Insert timelessly too! let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), [], &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } // ┌───────────┬──────────┬────────┬─────────────────┬────────────────────┬──────────────────────┬────────────────────────────┐ @@ -679,7 +679,7 @@ fn simple_splatted_range() { let points = vec![Point2D { x: 1.0, y: 2.0 }, Point2D { x: 3.0, y: 4.0 }]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint1, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); // Assign one of them a color with an explicit instance let color_instances = vec![InstanceKey(1)]; @@ -691,7 +691,7 @@ fn simple_splatted_range() { (color_instances, colors), ) .unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } let timepoint2 = [build_frame_nr(223.into())]; @@ -706,7 +706,7 @@ fn simple_splatted_range() { (color_instances, colors), ) .unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } let timepoint3 = [build_frame_nr(323.into())]; @@ -715,7 +715,7 @@ fn simple_splatted_range() { let points = vec![Point2D { x: 10.0, y: 20.0 }, Point2D { x: 30.0, y: 40.0 }]; let bundle = try_build_msg_bundle1(MsgId::random(), ent_path.clone(), timepoint3, &points).unwrap(); - store.insert(&bundle).unwrap(); + store.insert_row(&bundle).unwrap(); } // --- First test: `(timepoint1, timepoint3]` --- diff --git a/crates/re_renderer/Cargo.toml b/crates/re_renderer/Cargo.toml index 1ced6db76d5b..a1bb8c8e45e9 100644 --- a/crates/re_renderer/Cargo.toml +++ b/crates/re_renderer/Cargo.toml @@ -53,7 +53,7 @@ ecolor = { workspace = true, features = ["bytemuck"] } enumset.workspace = true glam = { workspace = true, features = ["bytemuck"] } half = { workspace = true, features = ["bytemuck"] } -itertools = "0.10" +itertools = { workspace = true } macaw.workspace = true memoffset = "0.8" ordered-float = "3.2" diff --git a/crates/re_sdk/src/lib.rs b/crates/re_sdk/src/lib.rs index ccca1c200f7f..c46d8a33fd54 100644 --- a/crates/re_sdk/src/lib.rs +++ b/crates/re_sdk/src/lib.rs @@ -54,10 +54,7 @@ pub mod sink { /// Things directly related to logging. pub mod log { - pub use re_log_types::{ - msg_bundle::{ComponentBundle, MsgBundle}, - LogMsg, MsgId, PathOp, - }; + pub use re_log_types::{msg_bundle::MsgBundle, DataCell, LogMsg, MsgId, PathOp}; } /// Time-related types. diff --git a/crates/re_sdk/src/msg_sender.rs b/crates/re_sdk/src/msg_sender.rs index a63e7c133504..919bb937152c 100644 --- a/crates/re_sdk/src/msg_sender.rs +++ b/crates/re_sdk/src/msg_sender.rs @@ -1,15 +1,10 @@ -use re_log_types::{ - component_types::InstanceKey, - external::arrow2_convert::serialize::TryIntoArrow, - msg_bundle::{wrap_in_listarray, MsgBundleError}, -}; +use re_log_types::{component_types::InstanceKey, msg_bundle::MsgBundleError}; -use arrow2::array::Array; use nohash_hasher::IntMap; use crate::{ components::Transform, - log::{ComponentBundle, LogMsg, MsgBundle, MsgId}, + log::{DataCell, LogMsg, MsgBundle, MsgId}, sink::LogSink, time::{Time, TimeInt, TimePoint, Timeline}, Component, ComponentName, EntityPath, SerializableComponent, @@ -35,7 +30,7 @@ pub enum MsgSenderError { "All component collections must share the same number of instances (i.e. row length) \ for a given row, got {0:?} instead" )] - MismatchedRowLengths(Vec<(ComponentName, usize)>), + MismatchedRowLengths(Vec<(ComponentName, u32)>), /// Instance keys cannot be splatted #[error("Instance keys cannot be splatted")] @@ -69,6 +64,7 @@ pub enum MsgSenderError { /// .map_err(Into::into) /// } /// ``` +// TODO(#1619): this should embed a DataTable soon. pub struct MsgSender { // TODO(cmc): At the moment, a `MsgBundle` can only contain data for a single entity, so // this must be known as soon as we spawn the builder. @@ -94,19 +90,19 @@ pub struct MsgSender { /// collection will always be 1. /// The number of instances per row, on the other hand, will be decided based upon the first /// component collection that's appended. - num_instances: Option, + num_instances: Option, /// All the instanced component collections that have been appended to this message. /// /// As of today, they must have exactly 1 row of data (no batching), which itself must have /// `Self::num_instances` instance keys. - instanced: Vec, + instanced: Vec, /// All the splatted components that have been appended to this message. /// - /// By definition, all `ComponentBundle`s in this vector will have 1 row (no batching) and more + /// By definition, all `DataCell`s in this vector will have 1 row (no batching) and more /// importantly a single, special instance key for that row. - splatted: Vec, + splatted: Vec, } impl MsgSender { @@ -190,9 +186,9 @@ impl MsgSender { mut self, data: impl IntoIterator, ) -> Result { - let bundle = bundle_from_iter(data)?; + let cell = DataCell::try_from_native(data).map_err(MsgBundleError::from)?; - let num_instances = bundle.num_instances(0).unwrap(); // must have exactly 1 row atm + let num_instances = cell.num_instances(); // If this is the first appended collection, it gets to decide the row-length (i.e. number // of instances) of all future collections. @@ -200,20 +196,20 @@ impl MsgSender { self.num_instances = Some(num_instances); } - // Detect mismatched row-lengths early on... unless it's a Transform bundle: transforms + // Detect mismatched row-lengths early on... unless it's a Transform cell: transforms // behave differently and will be sent in their own message! if C::name() != Transform::name() && self.num_instances.unwrap() != num_instances { let collections = self .instanced .into_iter() - .map(|bundle| (bundle.name(), bundle.num_instances(0).unwrap_or(0))) + .map(|cell| (cell.component_name(), cell.num_instances())) .collect(); return Err(MsgSenderError::MismatchedRowLengths(collections)); } // TODO(cmc): if this is an InstanceKey and it contains u64::MAX, fire IllegalInstanceKey. - self.instanced.push(bundle); + self.instanced.push(cell); Ok(self) } @@ -236,7 +232,8 @@ impl MsgSender { return Err(MsgSenderError::SplattedInstanceKeys); } - self.splatted.push(bundle_from_iter(&[data])?); + self.splatted + .push(DataCell::try_from_native(&[data]).map_err(MsgBundleError::from)?); Ok(self) } @@ -306,35 +303,35 @@ impl MsgSender { // separate transforms from the rest // TODO(cmc): just use `Vec::drain_filter` once it goes stable... - let mut all_bundles: Vec<_> = instanced.into_iter().map(Some).collect(); - let standard_bundles: Vec<_> = all_bundles + let mut all_cells: Vec<_> = instanced.into_iter().map(Some).collect(); + let standard_cells: Vec<_> = all_cells .iter_mut() - .filter(|bundle| bundle.as_ref().unwrap().name() != Transform::name()) - .map(|bundle| bundle.take().unwrap()) + .filter(|cell| cell.as_ref().unwrap().component_name() != Transform::name()) + .map(|cell| cell.take().unwrap()) .collect(); - let transform_bundles: Vec<_> = all_bundles + let transform_cells: Vec<_> = all_cells .iter_mut() - .filter(|bundle| { - bundle - .as_ref() - .map_or(false, |bundle| bundle.name() == Transform::name()) + .filter(|cell| { + cell.as_ref() + .map_or(false, |cell| cell.component_name() == Transform::name()) }) - .map(|bundle| bundle.take().unwrap()) + .map(|cell| cell.take().unwrap()) .collect(); - debug_assert!(all_bundles.into_iter().all(|bundle| bundle.is_none())); + debug_assert!(all_cells.into_iter().all(|cell| cell.is_none())); // TODO(cmc): The sanity checks we do in here can (and probably should) be done in // `MsgBundle` instead so that the python SDK benefits from them too... but one step at a // time. + // TODO(#1619): All of this disappears once DataRow lands. // sanity check: no row-level batching let mut rows_per_comptype: IntMap = IntMap::default(); - for bundle in standard_bundles + for cell in standard_cells .iter() - .chain(&transform_bundles) + .chain(&transform_cells) .chain(&splatted) { - *rows_per_comptype.entry(bundle.name()).or_default() += bundle.num_rows(); + *rows_per_comptype.entry(cell.component_name()).or_default() += 1; } if rows_per_comptype.values().any(|num_rows| *num_rows > 1) { return Err(MsgSenderError::MoreThanOneRow( @@ -343,10 +340,9 @@ impl MsgSender { } // sanity check: transforms can't handle multiple instances - let num_transform_instances = transform_bundles + let num_transform_instances = transform_cells .get(0) - .and_then(|bundle| bundle.num_instances(0)) - .unwrap_or(0); + .map_or(0, |cell| cell.num_instances()); if num_transform_instances > 1 { re_log::warn!("detected Transform component with multiple instances"); } @@ -354,28 +350,28 @@ impl MsgSender { let mut msgs = [(); 3].map(|_| None); // Standard - msgs[0] = (!standard_bundles.is_empty()).then(|| { + msgs[0] = (!standard_cells.is_empty()).then(|| { MsgBundle::new( MsgId::random(), entity_path.clone(), timepoint.clone(), - standard_bundles, + standard_cells, ) }); // Transforms - msgs[1] = (!transform_bundles.is_empty()).then(|| { + msgs[1] = (!transform_cells.is_empty()).then(|| { MsgBundle::new( MsgId::random(), entity_path.clone(), timepoint.clone(), - transform_bundles, + transform_cells, ) }); // Splats msgs[2] = (!splatted.is_empty()).then(|| { - splatted.push(bundle_from_iter(&[InstanceKey::SPLAT]).unwrap()); + splatted.push(DataCell::from_native(&[InstanceKey::SPLAT])); MsgBundle::new(MsgId::random(), entity_path, timepoint, splatted) }); @@ -383,18 +379,6 @@ impl MsgSender { } } -fn bundle_from_iter<'a, C: SerializableComponent>( - data: impl IntoIterator, -) -> Result { - // TODO(cmc): Eeeh, that's not ideal to repeat that kind of logic in here, but orphan rules - // kinda force us to :/ - - let array: Box = TryIntoArrow::try_into_arrow(data)?; - let wrapped = wrap_in_listarray(array); - - Ok(ComponentBundle::new(C::name(), wrapped)) -} - #[cfg(test)] mod tests { use super::*; @@ -428,9 +412,8 @@ mod tests { { let standard = standard.unwrap(); let idx = standard.find_component(&components::Label::name()).unwrap(); - let bundle = &standard.components[idx]; - assert!(bundle.num_rows() == 1); - assert!(bundle.num_instances(0).unwrap() == 2); + let cell = &standard.cells[idx]; + assert!(cell.num_instances() == 2); } { @@ -438,9 +421,8 @@ mod tests { let idx = transforms .find_component(&components::Transform::name()) .unwrap(); - let bundle = &transforms.components[idx]; - assert!(bundle.num_rows() == 1); - assert!(bundle.num_instances(0).unwrap() == 1); + let cell = &transforms.cells[idx]; + assert!(cell.num_instances() == 1); } { @@ -448,9 +430,8 @@ mod tests { let idx = splats .find_component(&components::ColorRGBA::name()) .unwrap(); - let bundle = &splats.components[idx]; - assert!(bundle.num_rows() == 1); - assert!(bundle.num_instances(0).unwrap() == 1); + let cell = &splats.cells[idx]; + assert!(cell.num_instances() == 1); } Ok(()) diff --git a/crates/re_viewer/Cargo.toml b/crates/re_viewer/Cargo.toml index 24a2c6ff4694..6a92be8e3141 100644 --- a/crates/re_viewer/Cargo.toml +++ b/crates/re_viewer/Cargo.toml @@ -87,7 +87,7 @@ image = { workspace = true, default-features = false, features = [ "png", ] } instant = { version = "0.1", features = ["wasm-bindgen"] } -itertools = "0.10" +itertools = { workspace = true } lazy_static.workspace = true macaw = { workspace = true, features = ["with_serde"] } mint = "0.5" diff --git a/crates/re_viewer/src/ui/data_ui/log_msg.rs b/crates/re_viewer/src/ui/data_ui/log_msg.rs index c248eedb6fbb..82adebb258fc 100644 --- a/crates/re_viewer/src/ui/data_ui/log_msg.rs +++ b/crates/re_viewer/src/ui/data_ui/log_msg.rs @@ -106,7 +106,7 @@ impl DataUi for ArrowMsg { msg_id: _, entity_path, time_point, - components, + cells: components, }) => { egui::Grid::new("fields").num_columns(2).show(ui, |ui| { ui.monospace("entity_path:"); diff --git a/crates/re_viewer/src/ui/data_ui/mod.rs b/crates/re_viewer/src/ui/data_ui/mod.rs index 5e70eb0594c4..cb94694d0a40 100644 --- a/crates/re_viewer/src/ui/data_ui/mod.rs +++ b/crates/re_viewer/src/ui/data_ui/mod.rs @@ -1,7 +1,7 @@ //! The `DataUi` trait and implementations provide methods for representing data using [`egui`]. use itertools::Itertools; -use re_log_types::{msg_bundle::ComponentBundle, PathOp, TimePoint}; +use re_log_types::{DataCell, PathOp, TimePoint}; use crate::misc::ViewerContext; @@ -69,7 +69,7 @@ impl DataUi for TimePoint { } } -impl DataUi for [ComponentBundle] { +impl DataUi for [DataCell] { fn data_ui( &self, _ctx: &mut ViewerContext<'_>, @@ -78,17 +78,17 @@ impl DataUi for [ComponentBundle] { _query: &re_arrow_store::LatestAtQuery, ) { let mut sorted = self.to_vec(); - sorted.sort_by_key(|cb| cb.name()); + sorted.sort_by_key(|cb| cb.component_name()); match verbosity { UiVerbosity::Small | UiVerbosity::MaxHeight(_) => { - ui.label(sorted.iter().map(format_component_bundle).join(", ")); + ui.label(sorted.iter().map(format_cell).join(", ")); } UiVerbosity::All | UiVerbosity::Reduced => { ui.vertical(|ui| { for component_bundle in &sorted { - ui.label(format_component_bundle(component_bundle)); + ui.label(format_cell(component_bundle)); } }); } @@ -96,12 +96,12 @@ impl DataUi for [ComponentBundle] { } } -fn format_component_bundle(bundle: &ComponentBundle) -> String { +fn format_cell(cell: &DataCell) -> String { // TODO(emilk): if there's only once instance, and the byte size is small, then deserialize and show the value. format!( "{}x {}", - bundle.num_instances(0).unwrap(), // all of our bundles have exactly 1 row as of today - bundle.name().short_name() + cell.num_instances(), + cell.component_name().short_name() ) } diff --git a/crates/re_viewer/src/ui/event_log_view.rs b/crates/re_viewer/src/ui/event_log_view.rs index ed4ed4018188..ce00ca27cd1f 100644 --- a/crates/re_viewer/src/ui/event_log_view.rs +++ b/crates/re_viewer/src/ui/event_log_view.rs @@ -180,7 +180,7 @@ fn table_row( msg_id, entity_path, time_point, - components, + cells: components, }) => { row.col(|ui| { ctx.msg_id_button(ui, msg_id); diff --git a/crates/rerun/Cargo.toml b/crates/rerun/Cargo.toml index 7074903844b0..21dbe5025db9 100644 --- a/crates/rerun/Cargo.toml +++ b/crates/rerun/Cargo.toml @@ -77,7 +77,7 @@ anyhow.workspace = true crossbeam = "0.8" document-features = "0.2" egui = { workspace = true, default-features = false } -itertools = "0.10" +itertools = { workspace = true } parking_lot.workspace = true puffin.workspace = true diff --git a/examples/rust/api_demo/Cargo.toml b/examples/rust/api_demo/Cargo.toml index eb0986f161b1..508fa149957a 100644 --- a/examples/rust/api_demo/Cargo.toml +++ b/examples/rust/api_demo/Cargo.toml @@ -12,7 +12,7 @@ rerun = { workspace = true, features = ["web_viewer"] } anyhow.workspace = true clap = { workspace = true, features = ["derive"] } glam.workspace = true -itertools = "0.10" +itertools = { workspace = true } ndarray.workspace = true ndarray-rand = "0.14" rand = "0.8" diff --git a/examples/rust/dna/Cargo.toml b/examples/rust/dna/Cargo.toml index f74b3d89f7ab..d4f94b92e839 100644 --- a/examples/rust/dna/Cargo.toml +++ b/examples/rust/dna/Cargo.toml @@ -9,5 +9,5 @@ publish = false [dependencies] rerun.workspace = true -itertools = "0.10" +itertools = { workspace = true } rand = "0.8" diff --git a/examples/rust/objectron/Cargo.toml b/examples/rust/objectron/Cargo.toml index 70d74911b754..ee1fe349f747 100644 --- a/examples/rust/objectron/Cargo.toml +++ b/examples/rust/objectron/Cargo.toml @@ -14,7 +14,7 @@ anyhow.workspace = true clap = { workspace = true, features = ["derive"] } glam.workspace = true image = { workspace = true, default-features = false, features = ["jpeg"] } -itertools = "0.10" +itertools = { workspace = true } prost = "0.11" diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index 9f5a687e1d86..50fa0ada3d14 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -60,7 +60,7 @@ document-features = "0.2" glam.workspace = true half.workspace = true image = { workspace = true, default-features = false, features = ["jpeg"] } -itertools = "0.10" +itertools = { workspace = true } macaw.workspace = true mimalloc = { workspace = true, features = ["local_dynamic_tls"] } numpy = { version = "0.18.0", features = ["half"] } diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index 9e6f33c4c91c..b3e91b3c5b25 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -10,8 +10,8 @@ use pyo3::{ }; use re_log_types::{ component_types, - msg_bundle::{self, ComponentBundle, MsgBundle, MsgBundleError}, - EntityPath, LogMsg, MsgId, TimePoint, + msg_bundle::{self, MsgBundle, MsgBundleError}, + DataCell, EntityPath, LogMsg, MsgId, TimePoint, }; /// Perform conversion between a pyarrow array to arrow2 types. @@ -98,19 +98,17 @@ pub fn build_chunk_from_components( |iter| iter.unzip(), )?; - let cmp_bundles = arrays + let cells = arrays .into_iter() .zip(fields.into_iter()) - .map(|(value, field)| { - ComponentBundle::new(field.name.into(), msg_bundle::wrap_in_listarray(value)) - }) + .map(|(value, field)| DataCell::from_arrow(field.name.into(), value)) .collect(); let msg_bundle = MsgBundle::new( MsgId::random(), entity_path.clone(), time_point.clone(), - cmp_bundles, + cells, ); let msg = msg_bundle