diff --git a/crates/re_arrow_store/src/lib.rs b/crates/re_arrow_store/src/lib.rs index c32ebf1d473c..ec95c47f0505 100644 --- a/crates/re_arrow_store/src/lib.rs +++ b/crates/re_arrow_store/src/lib.rs @@ -16,8 +16,10 @@ mod arrow_util; mod store; +mod store_format; mod store_gc; mod store_read; +mod store_sanity; mod store_stats; mod store_write; diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index 153849e46f0d..7655aefd2e05 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -2,13 +2,11 @@ use std::collections::{BTreeMap, HashMap, VecDeque}; use std::num::NonZeroU64; use std::sync::atomic::AtomicU64; -use anyhow::{anyhow, ensure}; -use arrow2::array::{Array, Int64Array, UInt64Array}; +use arrow2::array::{Array, Int64Array}; use arrow2::datatypes::{DataType, TimeUnit}; use nohash_hasher::{IntMap, IntSet}; use parking_lot::RwLock; -use re_format::{arrow, format_bytes, format_number}; use re_log_types::{ ComponentName, EntityPath, EntityPathHash, MsgId, TimeInt, TimePoint, TimeRange, Timeline, }; @@ -81,24 +79,9 @@ impl RowIndex { } } -impl std::fmt::Display for RowIndex { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.kind() { - RowIndexKind::Temporal => f.write_fmt(format_args!("Temporal({})", self.0)), - RowIndexKind::Timeless => f.write_fmt(format_args!("Timeless({})", self.0)), - } - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct IndexRowNr(pub(crate) u64); -impl std::fmt::Display for IndexRowNr { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!("{}", self.0)) - } -} - // --- Data store --- #[derive(Debug, Clone)] @@ -311,218 +294,6 @@ impl DataStore { pub fn lookup_data_type(&self, component: &ComponentName) -> Option<&DataType> { self.components.get(component).map(|c| &c.datatype) } - - /// Runs the sanity check suite for the entire datastore. - /// - /// Returns an error if anything looks wrong. - pub fn sanity_check(&self) -> anyhow::Result<()> { - crate::profile_function!(); - - // Row indices should be continuous across all index tables. - if self.gc_id == 0 { - let mut row_indices: IntMap<_, Vec> = IntMap::default(); - for table in self.indices.values() { - for bucket in table.buckets.values() { - for (comp, index) in &bucket.indices.read().indices { - let row_indices = row_indices.entry(*comp).or_default(); - row_indices.extend(index.iter().flatten().map(|row_idx| row_idx.as_u64())); - } - } - } - - for (comp, mut row_indices) in row_indices { - // Not an actual row index! - if comp == DataStore::insert_id_key() { - continue; - } - - row_indices.sort(); - row_indices.dedup(); - for pair in row_indices.windows(2) { - let &[i1, i2] = pair else { unreachable!() }; - ensure!( - i1 + 1 == i2, - "found hole in index coverage for {comp:?}: \ - in {row_indices:?}, {i1} -> {i2}" - ); - } - } - } - - // Row indices should be continuous across all timeless index tables. - { - let mut row_indices: IntMap<_, Vec> = IntMap::default(); - for table in self.timeless_indices.values() { - for (comp, index) in &table.indices { - let row_indices = row_indices.entry(*comp).or_default(); - row_indices.extend(index.iter().flatten().map(|row_idx| row_idx.as_u64())); - } - } - - for (comp, mut row_indices) in row_indices { - // Not an actual row index! - if comp == DataStore::insert_id_key() { - continue; - } - - row_indices.sort(); - row_indices.dedup(); - for pair in row_indices.windows(2) { - let &[i1, i2] = pair else { unreachable!() }; - ensure!( - i1 + 1 == i2, - "found hole in timeless index coverage for {comp:?}: \ - in {row_indices:?}, {i1} -> {i2}" - ); - } - } - } - - for table in self.timeless_indices.values() { - table.sanity_check()?; - } - for table in self.timeless_components.values() { - table.sanity_check()?; - } - - for table in self.indices.values() { - table.sanity_check()?; - } - for table in self.components.values() { - table.sanity_check()?; - } - - Ok(()) - } - - /// The oldest time for which we have any data. - /// - /// Ignores timeless data. - /// - /// Useful to call after a gc. - pub fn oldest_time_per_timeline(&self) -> BTreeMap { - crate::profile_function!(); - - let mut oldest_time_per_timeline = BTreeMap::default(); - - for component_table in self.components.values() { - for bucket in &component_table.buckets { - for (timeline, time_range) in &bucket.time_ranges { - let entry = oldest_time_per_timeline - .entry(*timeline) - .or_insert(TimeInt::MAX); - *entry = time_range.min.min(*entry); - } - } - } - - oldest_time_per_timeline - } -} - -impl std::fmt::Display for DataStore { - #[allow(clippy::string_add)] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Self { - cluster_key, - config, - cluster_comp_cache: _, - messages: _, - indices, - components, - timeless_indices, - timeless_components, - insert_id: _, - query_id: _, - gc_id: _, - } = self; - - f.write_str("DataStore {\n")?; - - f.write_str(&indent::indent_all_by( - 4, - format!("cluster_key: {cluster_key:?}\n"), - ))?; - f.write_str(&indent::indent_all_by(4, format!("config: {config:?}\n")))?; - - { - f.write_str(&indent::indent_all_by( - 4, - format!( - "{} timeless index tables, for a total of {} across {} total rows\n", - timeless_indices.len(), - format_bytes(self.total_timeless_index_size_bytes() as _), - format_number(self.total_timeless_index_rows() as _) - ), - ))?; - f.write_str(&indent::indent_all_by(4, "timeless_indices: [\n"))?; - for table in timeless_indices.values() { - f.write_str(&indent::indent_all_by(8, "PersistentIndexTable {\n"))?; - f.write_str(&indent::indent_all_by(12, table.to_string() + "\n"))?; - f.write_str(&indent::indent_all_by(8, "}\n"))?; - } - f.write_str(&indent::indent_all_by(4, "]\n"))?; - } - { - f.write_str(&indent::indent_all_by( - 4, - format!( - "{} persistent component tables, for a total of {} across {} total rows\n", - timeless_components.len(), - format_bytes(self.total_timeless_component_size_bytes() as _), - format_number(self.total_timeless_component_rows() as _) - ), - ))?; - f.write_str(&indent::indent_all_by(4, "timeless_components: [\n"))?; - for table in timeless_components.values() { - f.write_str(&indent::indent_all_by(8, "PersistentComponentTable {\n"))?; - f.write_str(&indent::indent_all_by(12, table.to_string() + "\n"))?; - f.write_str(&indent::indent_all_by(8, "}\n"))?; - } - f.write_str(&indent::indent_all_by(4, "]\n"))?; - } - - { - f.write_str(&indent::indent_all_by( - 4, - format!( - "{} index tables, for a total of {} across {} total rows\n", - indices.len(), - format_bytes(self.total_temporal_index_size_bytes() as _), - format_number(self.total_temporal_index_rows() as _) - ), - ))?; - f.write_str(&indent::indent_all_by(4, "indices: [\n"))?; - for table in indices.values() { - f.write_str(&indent::indent_all_by(8, "IndexTable {\n"))?; - f.write_str(&indent::indent_all_by(12, table.to_string() + "\n"))?; - f.write_str(&indent::indent_all_by(8, "}\n"))?; - } - f.write_str(&indent::indent_all_by(4, "]\n"))?; - } - { - f.write_str(&indent::indent_all_by( - 4, - format!( - "{} component tables, for a total of {} across {} total rows\n", - components.len(), - format_bytes(self.total_temporal_component_size_bytes() as _), - format_number(self.total_temporal_component_rows() as _) - ), - ))?; - f.write_str(&indent::indent_all_by(4, "components: [\n"))?; - for table in components.values() { - f.write_str(&indent::indent_all_by(8, "ComponentTable {\n"))?; - f.write_str(&indent::indent_all_by(12, table.to_string() + "\n"))?; - f.write_str(&indent::indent_all_by(8, "}\n"))?; - } - f.write_str(&indent::indent_all_by(4, "]\n"))?; - } - - f.write_str("}")?; - - Ok(()) - } } // --- Persistent Indices --- @@ -554,99 +325,6 @@ pub struct PersistentIndexTable { pub(crate) all_components: IntSet, } -impl std::fmt::Display for PersistentIndexTable { - #[allow(clippy::string_add)] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Self { - ent_path, - cluster_key: _, - num_rows: _, - indices: _, - all_components: _, - } = self; - - f.write_fmt(format_args!("entity: {ent_path}\n"))?; - - f.write_fmt(format_args!( - "size: {} across {} rows\n", - format_bytes(self.total_size_bytes() as _), - format_number(self.total_rows() as _), - ))?; - - let (col_names, cols) = self.named_indices(); - - let names = col_names.into_iter().map(|name| name.to_string()); - let values = cols.into_iter().map(|c| c.boxed()); - let table = arrow::format_table(values, names); - - f.write_fmt(format_args!("data:\n{table}\n"))?; - - Ok(()) - } -} - -impl PersistentIndexTable { - /// Runs the sanity check suite for the entire table. - /// - /// Returns an error if anything looks wrong. - pub fn sanity_check(&self) -> anyhow::Result<()> { - crate::profile_function!(); - - let Self { - ent_path: _, - cluster_key, - num_rows, - indices, - all_components: _, - } = self; - - // All indices should be `Self::num_rows` long. - { - for (comp, index) in indices { - let secondary_len = index.len() as u64; - ensure!( - *num_rows == secondary_len, - "found rogue secondary index for {comp:?}: \ - expected {num_rows} rows, got {secondary_len} instead", - ); - } - } - - // The cluster index must be fully dense. - { - let cluster_idx = indices - .get(cluster_key) - .ok_or_else(|| anyhow!("no index found for cluster key: {cluster_key:?}"))?; - ensure!( - cluster_idx.iter().all(|row| row.is_some()), - "the cluster index ({cluster_key:?}) must be fully dense: \ - got {cluster_idx:?}", - ); - } - - Ok(()) - } - - pub fn named_indices(&self) -> (Vec, Vec) { - crate::profile_function!(); - - self.indices - .iter() - .map(|(name, index)| { - ( - name, - UInt64Array::from( - index - .iter() - .map(|row_idx| row_idx.map(|row_idx| row_idx.as_u64())) - .collect::>(), - ), - ) - }) - .unzip() - } -} - // --- Indices --- /// An `IndexTable` maps specific points in time to rows in component tables. @@ -783,80 +461,10 @@ pub struct IndexTable { pub(crate) all_components: IntSet, } -impl std::fmt::Display for IndexTable { - #[allow(clippy::string_add)] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Self { - timeline, - ent_path, - buckets, - cluster_key: _, - all_components: _, - } = self; - - f.write_fmt(format_args!("timeline: {}\n", timeline.name()))?; - f.write_fmt(format_args!("entity: {ent_path}\n"))?; - - f.write_fmt(format_args!( - "size: {} buckets for a total of {} across {} total rows\n", - self.buckets.len(), - format_bytes(self.total_size_bytes() as _), - format_number(self.total_rows() as _), - ))?; - f.write_str("buckets: [\n")?; - for (time, bucket) in buckets.iter() { - f.write_str(&indent::indent_all_by(4, "IndexBucket {\n"))?; - f.write_str(&indent::indent_all_by( - 8, - format!("index time bound: >= {}\n", timeline.typ().format(*time),), - ))?; - f.write_str(&indent::indent_all_by(8, bucket.to_string()))?; - f.write_str(&indent::indent_all_by(4, "}\n"))?; - } - f.write_str("]")?; - - Ok(()) - } -} - impl IndexTable { pub fn entity_path(&self) -> &EntityPath { &self.ent_path } - - /// Runs the sanity check suite for the entire table. - /// - /// Returns an error if anything looks wrong. - pub fn sanity_check(&self) -> anyhow::Result<()> { - crate::profile_function!(); - - // No two buckets should ever overlap time-range-wise. - { - let time_ranges = self - .buckets - .values() - .map(|bucket| bucket.indices.read().time_range) - .collect::>(); - for time_ranges in time_ranges.windows(2) { - let &[t1, t2] = time_ranges else { unreachable!() }; - ensure!( - t1.max.as_i64() < t2.min.as_i64(), - "found overlapping index buckets: {} ({}) <-> {} ({})", - self.timeline.typ().format(t1.max), - t1.max.as_i64(), - self.timeline.typ().format(t2.min), - t2.min.as_i64(), - ); - } - } - - // Run individual bucket sanity check suites too. - for bucket in self.buckets.values() { - bucket.sanity_check()?; - } - - Ok(()) - } } /// An `IndexBucket` holds a size-delimited (data size and/or number of rows) chunk of a @@ -917,42 +525,7 @@ impl Default for IndexBucketIndices { } } -impl std::fmt::Display for IndexBucket { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!( - "size: {} across {} rows\n", - format_bytes(self.total_size_bytes() as _), - format_number(self.total_rows() as _), - ))?; - - f.write_fmt(format_args!("{}\n", self.formatted_time_range()))?; - - let (timeline_name, times) = self.times(); - let (col_names, cols) = self.named_indices(); - - let names = std::iter::once(timeline_name) - .chain(col_names.into_iter().map(|name| name.to_string())); - let values = std::iter::once(times.boxed()).chain(cols.into_iter().map(|c| c.boxed())); - let table = arrow::format_table(values, names); - - let is_sorted = self.is_sorted(); - f.write_fmt(format_args!("data (sorted={is_sorted}):\n{table}\n"))?; - - Ok(()) - } -} - impl IndexBucket { - /// Returns a formatted string of the time range in the bucket - pub fn formatted_time_range(&self) -> String { - let time_range = &self.indices.read().time_range; - if time_range.min.as_i64() != i64::MAX && time_range.max.as_i64() != i64::MIN { - self.timeline.format_time_range(time_range) - } else { - "time range: N/A\n".to_owned() - } - } - /// Returns an (name, [`Int64Array`]) with a logical type matching the timeline. pub fn times(&self) -> (String, Int64Array) { crate::profile_function!(); @@ -964,70 +537,6 @@ impl IndexBucket { }; (self.timeline.name().to_string(), times.to(logical_type)) } - - /// Returns a Vec each of (name, array) for each index in the bucket - pub fn named_indices(&self) -> (Vec, Vec) { - crate::profile_function!(); - - self.indices - .read() - .indices - .iter() - .map(|(name, index)| { - ( - name, - UInt64Array::from( - index - .iter() - .map(|row_idx| row_idx.map(|row_idx| row_idx.as_u64())) - .collect::>(), - ), - ) - }) - .unzip() - } - - /// Runs the sanity check suite for the entire bucket. - /// - /// Returns an error if anything looks wrong. - pub fn sanity_check(&self) -> anyhow::Result<()> { - crate::profile_function!(); - - let IndexBucketIndices { - is_sorted: _, - time_range: _, - times, - indices, - } = &*self.indices.read(); - - // All indices should contain the exact same number of rows as the time index. - { - let primary_len = times.len(); - for (comp, index) in indices { - let secondary_len = index.len(); - ensure!( - primary_len == secondary_len, - "found rogue secondary index for {comp:?}: \ - expected {primary_len} rows, got {secondary_len} instead", - ); - } - } - - // The cluster index must be fully dense. - { - let cluster_key = self.cluster_key; - let cluster_idx = indices - .get(&cluster_key) - .ok_or_else(|| anyhow!("no index found for cluster key: {cluster_key:?}"))?; - ensure!( - cluster_idx.iter().all(|row| row.is_some()), - "the cluster index ({cluster_key:?}) must be fully dense: \ - got {cluster_idx:?}", - ); - } - - Ok(()) - } } // --- Persistent Components --- @@ -1079,65 +588,6 @@ pub struct PersistentComponentTable { pub(crate) total_size_bytes: u64, } -impl std::fmt::Display for PersistentComponentTable { - #[allow(clippy::string_add)] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Self { - name, - datatype, - chunks, - total_rows, - total_size_bytes, - } = self; - - f.write_fmt(format_args!("name: {name}\n"))?; - if matches!( - std::env::var("RERUN_DATA_STORE_DISPLAY_SCHEMAS").as_deref(), - Ok("1") - ) { - f.write_fmt(format_args!("datatype: {datatype:#?}\n"))?; - } - - f.write_fmt(format_args!( - "size: {} across {} total rows\n", - format_bytes(*total_size_bytes as _), - format_number(*total_rows as _), - ))?; - - let data = { - use arrow2::compute::concatenate::concatenate; - let chunks = chunks.iter().map(|chunk| &**chunk).collect::>(); - concatenate(&chunks).unwrap() - }; - - let table = arrow::format_table([data], [self.name.as_str()]); - f.write_fmt(format_args!("{table}\n"))?; - - Ok(()) - } -} - -impl PersistentComponentTable { - /// Runs the sanity check suite for the entire table. - /// - /// Returns an error if anything looks wrong. - pub fn sanity_check(&self) -> anyhow::Result<()> { - crate::profile_function!(); - - // All chunks should always be dense - { - for chunk in &self.chunks { - ensure!( - chunk.validity().is_none(), - "persistent component chunks should always be dense", - ); - } - } - - Ok(()) - } -} - // --- Components --- /// A `ComponentTable` holds all the values ever inserted for a given component (provided they @@ -1236,72 +686,6 @@ pub struct ComponentTable { pub(crate) buckets: VecDeque, } -impl std::fmt::Display for ComponentTable { - #[allow(clippy::string_add)] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Self { - name, - datatype, - buckets, - } = self; - - f.write_fmt(format_args!("name: {name}\n"))?; - if matches!( - std::env::var("RERUN_DATA_STORE_DISPLAY_SCHEMAS").as_deref(), - Ok("1") - ) { - f.write_fmt(format_args!("datatype: {datatype:#?}\n"))?; - } - - f.write_fmt(format_args!( - "size: {} buckets for a total of {} across {} total rows\n", - self.buckets.len(), - format_bytes(self.total_size_bytes() as _), - format_number(self.total_rows() as _), - ))?; - f.write_str("buckets: [\n")?; - for bucket in buckets { - f.write_str(&indent::indent_all_by(4, "ComponentBucket {\n"))?; - f.write_str(&indent::indent_all_by(8, bucket.to_string()))?; - f.write_str(&indent::indent_all_by(4, "}\n"))?; - } - f.write_str("]")?; - - Ok(()) - } -} - -impl ComponentTable { - /// Runs the sanity check suite for the entire table. - /// - /// Returns an error if anything looks wrong. - pub fn sanity_check(&self) -> anyhow::Result<()> { - crate::profile_function!(); - - // No two buckets should ever overlap row-range-wise. - { - let row_ranges = self - .buckets - .iter() - .map(|bucket| bucket.row_offset..bucket.row_offset + bucket.total_rows()) - .collect::>(); - for row_ranges in row_ranges.windows(2) { - let &[r1, r2] = &row_ranges else { unreachable!() }; - ensure!( - !r1.contains(&r2.start), - "found overlapping component buckets: {r1:?} <-> {r2:?}" - ); - } - } - - for bucket in &self.buckets { - bucket.sanity_check()?; - } - - Ok(()) - } -} - /// A `ComponentBucket` holds a size-delimited (data size) chunk of a [`ComponentTable`]. #[derive(Debug)] pub struct ComponentBucket { @@ -1367,71 +751,3 @@ pub struct ComponentBucket { /// cache this. pub(crate) total_size_bytes: u64, } - -impl std::fmt::Display for ComponentBucket { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!( - "size: {} across {} rows\n", - format_bytes(self.total_size_bytes() as _), - format_number(self.total_rows() as _), - ))?; - - f.write_fmt(format_args!( - "row range: from {} to {} (all inclusive)\n", - self.row_offset, - // Component buckets can never be empty at the moment: - // - the first bucket is always initialized with a single empty row - // - all buckets that follow are lazily instantiated when data get inserted - // - // TODO(#439): is that still true with deletion? - // TODO(#589): support for non-unit-length chunks - self.row_offset - + self - .chunks - .len() - .checked_sub(1) - .expect("buckets are never empty") as u64, - ))?; - - f.write_fmt(format_args!("archived: {}\n", self.archived))?; - f.write_str("time ranges:\n")?; - for (timeline, time_range) in &self.time_ranges { - f.write_fmt(format_args!( - "{}\n", - &timeline.format_time_range(time_range) - ))?; - } - - let data = { - use arrow2::compute::concatenate::concatenate; - let chunks = self.chunks.iter().map(|chunk| &**chunk).collect::>(); - concatenate(&chunks).unwrap() - }; - - let table = arrow::format_table([data], [self.name.as_str()]); - f.write_fmt(format_args!("{table}\n"))?; - - Ok(()) - } -} - -impl ComponentBucket { - /// Runs the sanity check suite for the entire table. - /// - /// Returns an error if anything looks wrong. - pub fn sanity_check(&self) -> anyhow::Result<()> { - crate::profile_function!(); - - // All chunks should always be dense - { - for chunk in &self.chunks { - ensure!( - chunk.validity().is_none(), - "component bucket chunks should always be dense", - ); - } - } - - Ok(()) - } -} diff --git a/crates/re_arrow_store/src/store_format.rs b/crates/re_arrow_store/src/store_format.rs new file mode 100644 index 000000000000..06b016b59cfd --- /dev/null +++ b/crates/re_arrow_store/src/store_format.rs @@ -0,0 +1,389 @@ +use arrow2::array::UInt64Array; +use re_format::{arrow, format_bytes, format_number}; + +use crate::{ + ComponentBucket, ComponentTable, DataStore, IndexBucket, IndexRowNr, IndexTable, + PersistentComponentTable, PersistentIndexTable, RowIndex, RowIndexKind, +}; + +// --- Indices & offsets --- + +impl std::fmt::Display for RowIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.kind() { + RowIndexKind::Temporal => f.write_fmt(format_args!("Temporal({})", self.0)), + RowIndexKind::Timeless => f.write_fmt(format_args!("Timeless({})", self.0)), + } + } +} + +impl std::fmt::Display for IndexRowNr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{}", self.0)) + } +} + +// --- Data store --- + +impl std::fmt::Display for DataStore { + #[allow(clippy::string_add)] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + cluster_key, + config, + cluster_comp_cache: _, + messages: _, + indices, + components, + timeless_indices, + timeless_components, + insert_id: _, + query_id: _, + gc_id: _, + } = self; + + f.write_str("DataStore {\n")?; + + f.write_str(&indent::indent_all_by( + 4, + format!("cluster_key: {cluster_key:?}\n"), + ))?; + f.write_str(&indent::indent_all_by(4, format!("config: {config:?}\n")))?; + + { + f.write_str(&indent::indent_all_by( + 4, + format!( + "{} timeless index tables, for a total of {} across {} total rows\n", + timeless_indices.len(), + format_bytes(self.total_timeless_index_size_bytes() as _), + format_number(self.total_timeless_index_rows() as _) + ), + ))?; + f.write_str(&indent::indent_all_by(4, "timeless_indices: [\n"))?; + for table in timeless_indices.values() { + f.write_str(&indent::indent_all_by(8, "PersistentIndexTable {\n"))?; + f.write_str(&indent::indent_all_by(12, table.to_string() + "\n"))?; + f.write_str(&indent::indent_all_by(8, "}\n"))?; + } + f.write_str(&indent::indent_all_by(4, "]\n"))?; + } + { + f.write_str(&indent::indent_all_by( + 4, + format!( + "{} persistent component tables, for a total of {} across {} total rows\n", + timeless_components.len(), + format_bytes(self.total_timeless_component_size_bytes() as _), + format_number(self.total_timeless_component_rows() as _) + ), + ))?; + f.write_str(&indent::indent_all_by(4, "timeless_components: [\n"))?; + for table in timeless_components.values() { + f.write_str(&indent::indent_all_by(8, "PersistentComponentTable {\n"))?; + f.write_str(&indent::indent_all_by(12, table.to_string() + "\n"))?; + f.write_str(&indent::indent_all_by(8, "}\n"))?; + } + f.write_str(&indent::indent_all_by(4, "]\n"))?; + } + + { + f.write_str(&indent::indent_all_by( + 4, + format!( + "{} index tables, for a total of {} across {} total rows\n", + indices.len(), + format_bytes(self.total_temporal_index_size_bytes() as _), + format_number(self.total_temporal_index_rows() as _) + ), + ))?; + f.write_str(&indent::indent_all_by(4, "indices: [\n"))?; + for table in indices.values() { + f.write_str(&indent::indent_all_by(8, "IndexTable {\n"))?; + f.write_str(&indent::indent_all_by(12, table.to_string() + "\n"))?; + f.write_str(&indent::indent_all_by(8, "}\n"))?; + } + f.write_str(&indent::indent_all_by(4, "]\n"))?; + } + { + f.write_str(&indent::indent_all_by( + 4, + format!( + "{} component tables, for a total of {} across {} total rows\n", + components.len(), + format_bytes(self.total_temporal_component_size_bytes() as _), + format_number(self.total_temporal_component_rows() as _) + ), + ))?; + f.write_str(&indent::indent_all_by(4, "components: [\n"))?; + for table in components.values() { + f.write_str(&indent::indent_all_by(8, "ComponentTable {\n"))?; + f.write_str(&indent::indent_all_by(12, table.to_string() + "\n"))?; + f.write_str(&indent::indent_all_by(8, "}\n"))?; + } + f.write_str(&indent::indent_all_by(4, "]\n"))?; + } + + f.write_str("}")?; + + Ok(()) + } +} + +// --- Persistent Indices --- + +impl std::fmt::Display for PersistentIndexTable { + #[allow(clippy::string_add)] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + ent_path, + cluster_key: _, + num_rows: _, + indices: _, + all_components: _, + } = self; + + f.write_fmt(format_args!("entity: {ent_path}\n"))?; + + f.write_fmt(format_args!( + "size: {} across {} rows\n", + format_bytes(self.total_size_bytes() as _), + format_number(self.total_rows() as _), + ))?; + + let (col_names, cols): (Vec<_>, Vec<_>) = { + self.indices + .iter() + .map(|(name, index)| { + ( + name.to_string(), + UInt64Array::from( + index + .iter() + .map(|row_idx| row_idx.map(|row_idx| row_idx.as_u64())) + .collect::>(), + ), + ) + }) + .unzip() + }; + + let values = cols.into_iter().map(|c| c.boxed()); + let table = arrow::format_table(values, col_names); + + f.write_fmt(format_args!("data:\n{table}\n"))?; + + Ok(()) + } +} + +// --- Indices --- + +impl std::fmt::Display for IndexTable { + #[allow(clippy::string_add)] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + timeline, + ent_path, + buckets, + cluster_key: _, + all_components: _, + } = self; + + f.write_fmt(format_args!("timeline: {}\n", timeline.name()))?; + f.write_fmt(format_args!("entity: {ent_path}\n"))?; + + f.write_fmt(format_args!( + "size: {} buckets for a total of {} across {} total rows\n", + self.buckets.len(), + format_bytes(self.total_size_bytes() as _), + format_number(self.total_rows() as _), + ))?; + f.write_str("buckets: [\n")?; + for (time, bucket) in buckets.iter() { + f.write_str(&indent::indent_all_by(4, "IndexBucket {\n"))?; + f.write_str(&indent::indent_all_by( + 8, + format!("index time bound: >= {}\n", timeline.typ().format(*time),), + ))?; + f.write_str(&indent::indent_all_by(8, bucket.to_string()))?; + f.write_str(&indent::indent_all_by(4, "}\n"))?; + } + f.write_str("]")?; + + Ok(()) + } +} + +impl std::fmt::Display for IndexBucket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!( + "size: {} across {} rows\n", + format_bytes(self.total_size_bytes() as _), + format_number(self.total_rows() as _), + ))?; + + let time_range = { + let time_range = &self.indices.read().time_range; + if time_range.min.as_i64() != i64::MAX && time_range.max.as_i64() != i64::MIN { + self.timeline.format_time_range(time_range) + } else { + "time range: N/A\n".to_owned() + } + }; + f.write_fmt(format_args!("{}\n", time_range))?; + + let (timeline_name, times) = self.times(); + let (col_names, cols): (Vec<_>, Vec<_>) = { + self.indices + .read() + .indices + .iter() + .map(|(name, index)| { + ( + name.to_string(), + UInt64Array::from( + index + .iter() + .map(|row_idx| row_idx.map(|row_idx| row_idx.as_u64())) + .collect::>(), + ), + ) + }) + .unzip() + }; + + let names = std::iter::once(timeline_name).chain(col_names); + let values = std::iter::once(times.boxed()).chain(cols.into_iter().map(|c| c.boxed())); + let table = arrow::format_table(values, names); + + let is_sorted = self.is_sorted(); + f.write_fmt(format_args!("data (sorted={is_sorted}):\n{table}\n"))?; + + Ok(()) + } +} + +// --- Persistent Components --- + +impl std::fmt::Display for PersistentComponentTable { + #[allow(clippy::string_add)] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + name, + datatype, + chunks, + total_rows, + total_size_bytes, + } = self; + + f.write_fmt(format_args!("name: {name}\n"))?; + if matches!( + std::env::var("RERUN_DATA_STORE_DISPLAY_SCHEMAS").as_deref(), + Ok("1") + ) { + f.write_fmt(format_args!("datatype: {datatype:#?}\n"))?; + } + + f.write_fmt(format_args!( + "size: {} across {} total rows\n", + format_bytes(*total_size_bytes as _), + format_number(*total_rows as _), + ))?; + + let data = { + use arrow2::compute::concatenate::concatenate; + let chunks = chunks.iter().map(|chunk| &**chunk).collect::>(); + concatenate(&chunks).unwrap() + }; + + let table = arrow::format_table([data], [self.name.as_str()]); + f.write_fmt(format_args!("{table}\n"))?; + + Ok(()) + } +} + +// --- Components --- + +impl std::fmt::Display for ComponentTable { + #[allow(clippy::string_add)] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + name, + datatype, + buckets, + } = self; + + f.write_fmt(format_args!("name: {name}\n"))?; + if matches!( + std::env::var("RERUN_DATA_STORE_DISPLAY_SCHEMAS").as_deref(), + Ok("1") + ) { + f.write_fmt(format_args!("datatype: {datatype:#?}\n"))?; + } + + f.write_fmt(format_args!( + "size: {} buckets for a total of {} across {} total rows\n", + self.buckets.len(), + format_bytes(self.total_size_bytes() as _), + format_number(self.total_rows() as _), + ))?; + f.write_str("buckets: [\n")?; + for bucket in buckets { + f.write_str(&indent::indent_all_by(4, "ComponentBucket {\n"))?; + f.write_str(&indent::indent_all_by(8, bucket.to_string()))?; + f.write_str(&indent::indent_all_by(4, "}\n"))?; + } + f.write_str("]")?; + + Ok(()) + } +} + +impl std::fmt::Display for ComponentBucket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!( + "size: {} across {} rows\n", + format_bytes(self.total_size_bytes() as _), + format_number(self.total_rows() as _), + ))?; + + f.write_fmt(format_args!( + "row range: from {} to {} (all inclusive)\n", + self.row_offset, + // Component buckets can never be empty at the moment: + // - the first bucket is always initialized with a single empty row + // - all buckets that follow are lazily instantiated when data get inserted + // + // TODO(#439): is that still true with deletion? + // TODO(#589): support for non-unit-length chunks + self.row_offset + + self + .chunks + .len() + .checked_sub(1) + .expect("buckets are never empty") as u64, + ))?; + + f.write_fmt(format_args!("archived: {}\n", self.archived))?; + f.write_str("time ranges:\n")?; + for (timeline, time_range) in &self.time_ranges { + f.write_fmt(format_args!( + "{}\n", + &timeline.format_time_range(time_range) + ))?; + } + + let data = { + use arrow2::compute::concatenate::concatenate; + let chunks = self.chunks.iter().map(|chunk| &**chunk).collect::>(); + concatenate(&chunks).unwrap() + }; + + let table = arrow::format_table([data], [self.name.as_str()]); + f.write_fmt(format_args!("{table}\n"))?; + + Ok(()) + } +} diff --git a/crates/re_arrow_store/src/store_sanity.rs b/crates/re_arrow_store/src/store_sanity.rs new file mode 100644 index 000000000000..f002f1c13d8c --- /dev/null +++ b/crates/re_arrow_store/src/store_sanity.rs @@ -0,0 +1,327 @@ +use std::collections::BTreeMap; + +use anyhow::{anyhow, ensure}; +use nohash_hasher::IntMap; +use re_log_types::{TimeInt, Timeline}; + +use crate::{ + ComponentBucket, ComponentTable, DataStore, IndexBucket, IndexBucketIndices, IndexTable, + PersistentComponentTable, PersistentIndexTable, +}; + +// TODO(#527): Typed errors. + +// --- Data store --- + +impl DataStore { + /// Runs the sanity check suite for the entire datastore. + /// + /// Returns an error if anything looks wrong. + pub fn sanity_check(&self) -> anyhow::Result<()> { + crate::profile_function!(); + + // Row indices should be continuous across all index tables. + if self.gc_id == 0 { + let mut row_indices: IntMap<_, Vec> = IntMap::default(); + for table in self.indices.values() { + for bucket in table.buckets.values() { + for (comp, index) in &bucket.indices.read().indices { + let row_indices = row_indices.entry(*comp).or_default(); + row_indices.extend(index.iter().flatten().map(|row_idx| row_idx.as_u64())); + } + } + } + + for (comp, mut row_indices) in row_indices { + // Not an actual row index! + if comp == DataStore::insert_id_key() { + continue; + } + + row_indices.sort(); + row_indices.dedup(); + for pair in row_indices.windows(2) { + let &[i1, i2] = pair else { unreachable!() }; + ensure!( + i1 + 1 == i2, + "found hole in index coverage for {comp:?}: \ + in {row_indices:?}, {i1} -> {i2}" + ); + } + } + } + + // Row indices should be continuous across all timeless index tables. + { + let mut row_indices: IntMap<_, Vec> = IntMap::default(); + for table in self.timeless_indices.values() { + for (comp, index) in &table.indices { + let row_indices = row_indices.entry(*comp).or_default(); + row_indices.extend(index.iter().flatten().map(|row_idx| row_idx.as_u64())); + } + } + + for (comp, mut row_indices) in row_indices { + // Not an actual row index! + if comp == DataStore::insert_id_key() { + continue; + } + + row_indices.sort(); + row_indices.dedup(); + for pair in row_indices.windows(2) { + let &[i1, i2] = pair else { unreachable!() }; + ensure!( + i1 + 1 == i2, + "found hole in timeless index coverage for {comp:?}: \ + in {row_indices:?}, {i1} -> {i2}" + ); + } + } + } + + for table in self.timeless_indices.values() { + table.sanity_check()?; + } + for table in self.timeless_components.values() { + table.sanity_check()?; + } + + for table in self.indices.values() { + table.sanity_check()?; + } + for table in self.components.values() { + table.sanity_check()?; + } + + Ok(()) + } + + /// The oldest time for which we have any data. + /// + /// Ignores timeless data. + /// + /// Useful to call after a gc. + pub fn oldest_time_per_timeline(&self) -> BTreeMap { + crate::profile_function!(); + + let mut oldest_time_per_timeline = BTreeMap::default(); + + for component_table in self.components.values() { + for bucket in &component_table.buckets { + for (timeline, time_range) in &bucket.time_ranges { + let entry = oldest_time_per_timeline + .entry(*timeline) + .or_insert(TimeInt::MAX); + *entry = time_range.min.min(*entry); + } + } + } + + oldest_time_per_timeline + } +} + +// --- Persistent Indices --- + +impl PersistentIndexTable { + /// Runs the sanity check suite for the entire table. + /// + /// Returns an error if anything looks wrong. + pub fn sanity_check(&self) -> anyhow::Result<()> { + crate::profile_function!(); + + let Self { + ent_path: _, + cluster_key, + num_rows, + indices, + all_components: _, + } = self; + + // All indices should be `Self::num_rows` long. + { + for (comp, index) in indices { + let secondary_len = index.len() as u64; + ensure!( + *num_rows == secondary_len, + "found rogue secondary index for {comp:?}: \ + expected {num_rows} rows, got {secondary_len} instead", + ); + } + } + + // The cluster index must be fully dense. + { + let cluster_idx = indices + .get(cluster_key) + .ok_or_else(|| anyhow!("no index found for cluster key: {cluster_key:?}"))?; + ensure!( + cluster_idx.iter().all(|row| row.is_some()), + "the cluster index ({cluster_key:?}) must be fully dense: \ + got {cluster_idx:?}", + ); + } + + Ok(()) + } +} + +// --- Indices --- + +impl IndexTable { + /// Runs the sanity check suite for the entire table. + /// + /// Returns an error if anything looks wrong. + pub fn sanity_check(&self) -> anyhow::Result<()> { + crate::profile_function!(); + + // No two buckets should ever overlap time-range-wise. + { + let time_ranges = self + .buckets + .values() + .map(|bucket| bucket.indices.read().time_range) + .collect::>(); + for time_ranges in time_ranges.windows(2) { + let &[t1, t2] = time_ranges else { unreachable!() }; + ensure!( + t1.max.as_i64() < t2.min.as_i64(), + "found overlapping index buckets: {} ({}) <-> {} ({})", + self.timeline.typ().format(t1.max), + t1.max.as_i64(), + self.timeline.typ().format(t2.min), + t2.min.as_i64(), + ); + } + } + + // Run individual bucket sanity check suites too. + for bucket in self.buckets.values() { + bucket.sanity_check()?; + } + + Ok(()) + } +} + +impl IndexBucket { + /// Runs the sanity check suite for the entire bucket. + /// + /// Returns an error if anything looks wrong. + pub fn sanity_check(&self) -> anyhow::Result<()> { + crate::profile_function!(); + + let IndexBucketIndices { + is_sorted: _, + time_range: _, + times, + indices, + } = &*self.indices.read(); + + // All indices should contain the exact same number of rows as the time index. + { + let primary_len = times.len(); + for (comp, index) in indices { + let secondary_len = index.len(); + ensure!( + primary_len == secondary_len, + "found rogue secondary index for {comp:?}: \ + expected {primary_len} rows, got {secondary_len} instead", + ); + } + } + + // The cluster index must be fully dense. + { + let cluster_key = self.cluster_key; + let cluster_idx = indices + .get(&cluster_key) + .ok_or_else(|| anyhow!("no index found for cluster key: {cluster_key:?}"))?; + ensure!( + cluster_idx.iter().all(|row| row.is_some()), + "the cluster index ({cluster_key:?}) must be fully dense: \ + got {cluster_idx:?}", + ); + } + + Ok(()) + } +} + +// --- Persistent Components --- + +impl PersistentComponentTable { + /// Runs the sanity check suite for the entire table. + /// + /// Returns an error if anything looks wrong. + pub fn sanity_check(&self) -> anyhow::Result<()> { + crate::profile_function!(); + + // All chunks should always be dense + { + for chunk in &self.chunks { + ensure!( + chunk.validity().is_none(), + "persistent component chunks should always be dense", + ); + } + } + + Ok(()) + } +} + +// --- Components --- + +impl ComponentTable { + /// Runs the sanity check suite for the entire table. + /// + /// Returns an error if anything looks wrong. + pub fn sanity_check(&self) -> anyhow::Result<()> { + crate::profile_function!(); + + // No two buckets should ever overlap row-range-wise. + { + let row_ranges = self + .buckets + .iter() + .map(|bucket| bucket.row_offset..bucket.row_offset + bucket.total_rows()) + .collect::>(); + for row_ranges in row_ranges.windows(2) { + let &[r1, r2] = &row_ranges else { unreachable!() }; + ensure!( + !r1.contains(&r2.start), + "found overlapping component buckets: {r1:?} <-> {r2:?}" + ); + } + } + + for bucket in &self.buckets { + bucket.sanity_check()?; + } + + Ok(()) + } +} + +impl ComponentBucket { + /// Runs the sanity check suite for the entire table. + /// + /// Returns an error if anything looks wrong. + pub fn sanity_check(&self) -> anyhow::Result<()> { + crate::profile_function!(); + + // All chunks should always be dense + { + for chunk in &self.chunks { + ensure!( + chunk.validity().is_none(), + "component bucket chunks should always be dense", + ); + } + } + + Ok(()) + } +}