Skip to content

Commit

Permalink
make it all impossible to mis-use
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Mar 15, 2024
1 parent 8a40b20 commit bceb9bf
Show file tree
Hide file tree
Showing 55 changed files with 479 additions and 497 deletions.
2 changes: 1 addition & 1 deletion crates/re_data_store/benches/arrow2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ fn erased_clone(c: &mut Criterion) {
.sum::<u64>();
let expected_total_size_bytes = data.total_size_bytes();
assert!(
total_size_bytes >= expected_total_size_bytes,
total_size_bytes + 1 >= expected_total_size_bytes,
"Size for {} calculated to be {} bytes, but should be at least {} bytes",
T::name(),
total_size_bytes,
Expand Down
9 changes: 3 additions & 6 deletions crates/re_data_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ fn build_rows_with_packed(packed: bool) -> Vec<DataRow> {
NUM_INSTANCES as _,
false,
packed,
|row_idx| TimePoint::from([build_frame_nr((row_idx as i64).try_into().unwrap())]),
|row_idx| TimePoint::from([build_frame_nr(row_idx as i64)]),
)
}

Expand Down Expand Up @@ -453,7 +453,7 @@ fn latest_data_at<const N: usize>(
secondaries: &[ComponentName; N],
) -> [Option<DataCell>; N] {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let timeline_query = LatestAtQuery::new(timeline_frame_nr, (NUM_ROWS / 2).try_into().unwrap());
let timeline_query = LatestAtQuery::new(timeline_frame_nr, NUM_ROWS / 2);
let ent_path = EntityPath::from("large_structs");

store
Expand All @@ -466,10 +466,7 @@ fn range_data<const N: usize>(
components: [ComponentName; N],
) -> impl Iterator<Item = (Option<TimeInt>, [Option<DataCell>; N])> + '_ {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = RangeQuery::new(
timeline_frame_nr,
TimeRange::new(0.try_into().unwrap(), NUM_ROWS.try_into().unwrap()),
);
let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(TimeInt::ZERO, NUM_ROWS));
let ent_path = EntityPath::from("large_structs");

store
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_store/benches/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn plotting_dashboard(c: &mut Criterion) {
let mut timegen = |i| {
[
build_log_time(Time::from_seconds_since_epoch(i as _)),
build_frame_nr((i as i64).try_into().unwrap()),
build_frame_nr(i as i64),
]
.into()
};
Expand Down
14 changes: 7 additions & 7 deletions crates/re_data_store/src/store_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,9 @@ mod tests {

let row_id1 = RowId::new();
let timepoint1 = TimePoint::from_iter([
(timeline_frame, 42.try_into().unwrap()), //
(timeline_other, 666.try_into().unwrap()), //
(timeline_yet_another, 1.try_into().unwrap()), //
(timeline_frame, 42), //
(timeline_other, 666), //
(timeline_yet_another, 1), //
]);
let entity_path1: EntityPath = "entity_a".into();
let row1 = DataRow::from_component_batches(
Expand Down Expand Up @@ -339,8 +339,8 @@ mod tests {

let row_id2 = RowId::new();
let timepoint2 = TimePoint::from_iter([
(timeline_frame, 42.try_into().unwrap()), //
(timeline_yet_another, 1.try_into().unwrap()), //
(timeline_frame, 42), //
(timeline_yet_another, 1), //
]);
let entity_path2: EntityPath = "entity_b".into();
let row2 = {
Expand Down Expand Up @@ -487,7 +487,7 @@ mod tests {

let row1 = DataRow::from_component_batches(
RowId::new(),
TimePoint::from_iter([(timeline_frame, 42.try_into().unwrap())]),
TimePoint::from_iter([(timeline_frame, 42)]),
"entity_a".into(),
[&InstanceKey::from_iter(0..10) as _],
)?;
Expand All @@ -504,7 +504,7 @@ mod tests {

let row2 = DataRow::from_component_batches(
RowId::new(),
TimePoint::from_iter([(timeline_frame, 42.try_into().unwrap())]),
TimePoint::from_iter([(timeline_frame, 42)]),
"entity_b".into(),
[&[MyColor::from(0xAABBCCDD)] as _],
)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_store/src/store_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl std::fmt::Display for IndexedBucket {

let time_range = {
let time_range = &self.inner.read().time_range;
if time_range.min != TimeInt::MAX && time_range.max != TimeInt::MIN {
if time_range.min() != TimeInt::MAX && time_range.max() != TimeInt::MIN {
format!(
" - {}: {}",
self.timeline.name(),
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,11 +843,11 @@ impl IndexedBucketInner {
// We have at least two rows, so we can safely [index] here:
if row_index == 0 {
// We removed the first row, so the second row holds the new min
time_range.min = TimeInt::new_temporal(col_time[1]);
time_range.set_min(col_time[1]);
}
if row_index + 1 == col_time.len() {
// We removed the last row, so the penultimate row holds the new max
time_range.max = TimeInt::new_temporal(col_time[row_index - 1]);
time_range.set_max(col_time[row_index - 1]);
}
}

Expand Down
37 changes: 26 additions & 11 deletions crates/re_data_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::{
/// Get the latest version of the data available at this time.
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct LatestAtQuery {
pub timeline: Timeline,
pub at: TimeInt,
timeline: Timeline,
at: TimeInt,
}

impl std::fmt::Debug for LatestAtQuery {
Expand All @@ -34,16 +34,30 @@ impl std::fmt::Debug for LatestAtQuery {
}

impl LatestAtQuery {
pub const fn new(timeline: Timeline, at: TimeInt) -> Self {
/// The returned query is guaranteed to never include [`TimeInt::STATIC`].
#[inline]
pub fn new(timeline: Timeline, at: impl TryInto<TimeInt>) -> Self {
let at = at.try_into().unwrap_or(TimeInt::MIN);
Self { timeline, at }
}

#[inline]
pub const fn latest(timeline: Timeline) -> Self {
Self {
timeline,
at: TimeInt::MAX,
}
}

#[inline]
pub fn timeline(&self) -> Timeline {
self.timeline
}

#[inline]
pub fn at(&self) -> TimeInt {
self.at
}
}

/// A query over a time range, for a given timeline.
Expand All @@ -62,10 +76,10 @@ impl std::fmt::Debug for RangeQuery {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"<ranging from {} to {} (all inclusive) on {:?} ({} timeless)>",
self.timeline.typ().format_utc(self.range.min),
self.timeline.typ().format_utc(self.range.max),
self.timeline.typ().format_utc(self.range.min()),
self.timeline.typ().format_utc(self.range.max()),
self.timeline.name(),
if self.range.min <= TimeInt::MIN {
if self.range.min() <= TimeInt::MIN {
"including"
} else {
"excluding"
Expand All @@ -75,6 +89,7 @@ impl std::fmt::Debug for RangeQuery {
}

impl RangeQuery {
/// The returned query is guaranteed to never include [`TimeInt::STATIC`].
pub const fn new(timeline: Timeline, range: TimeRange) -> Self {
Self { timeline, range }
}
Expand Down Expand Up @@ -192,7 +207,7 @@ impl DataStore {
.inner
.read()
.time_range
.min;
.min();

// handle case where no data was logged
if min_time == TimeInt::MIN {
Expand Down Expand Up @@ -373,7 +388,7 @@ impl DataStore {
.flatten()
.map(|(time, row_id, cells)| (Some(time), row_id, cells));

if query.range.min <= TimeInt::MIN {
if query.range.min() <= TimeInt::MIN {
let timeless = self
.timeless_tables
.get(&ent_path_hash)
Expand Down Expand Up @@ -480,9 +495,9 @@ impl IndexedTable {
let timeline = self.timeline;

// We need to find the _indexing time_ that corresponds to this time range's minimum bound!
let (time_range_min, _) = self.find_bucket(time_range.min);
let (time_range_min, _) = self.find_bucket(time_range.min());

self.range_buckets(time_range_min..=time_range.max)
self.range_buckets(time_range_min..=time_range.max())
.map(|(_, bucket)| bucket)
.enumerate()
.flat_map(move |(bucket_nr, bucket)| {
Expand Down Expand Up @@ -778,7 +793,7 @@ impl IndexedBucket {
"searching for time & component cell numbers…"
);

let time_row_nr = col_time.partition_point(|t| *t < time_range.min.as_i64()) as u64;
let time_row_nr = col_time.partition_point(|t| *t < time_range.min().as_i64()) as u64;

trace!(
kind = "range",
Expand Down
10 changes: 5 additions & 5 deletions crates/re_data_store/src/store_sanity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ impl IndexedTable {
let &[t1, t2] = time_ranges else {
unreachable!()
};
if t1.max.as_i64() >= t2.min.as_i64() {
if t1.max().as_i64() >= t2.min().as_i64() {
return Err(SanityError::OverlappingBuckets {
t1_max: t1.max.as_i64(),
t1_max_formatted: self.timeline.typ().format_utc(t1.max),
t2_max: t2.max.as_i64(),
t2_max_formatted: self.timeline.typ().format_utc(t2.max),
t1_max: t1.max().as_i64(),
t1_max_formatted: self.timeline.typ().format_utc(t1.max()),
t2_max: t2.max().as_i64(),
t2_max_formatted: self.timeline.typ().format_utc(t2.max()),
});
}
}
Expand Down
10 changes: 5 additions & 5 deletions crates/re_data_store/src/store_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ mod tests {
let row = DataRow::from_component_batches(
RowId::new(),
TimePoint::from_iter([
(timeline_frame, 42.try_into().unwrap()), //
(timeline_other, 666.try_into().unwrap()), //
(timeline_yet_another, 1.try_into().unwrap()), //
(timeline_frame, 42), //
(timeline_other, 666), //
(timeline_yet_another, 1), //
]),
"entity_a".into(),
[&InstanceKey::from_iter(0..10) as _],
Expand All @@ -249,8 +249,8 @@ mod tests {
DataRow::from_component_batches(
RowId::new(),
TimePoint::from_iter([
(timeline_frame, 42.try_into().unwrap()), //
(timeline_yet_another, 1.try_into().unwrap()), //
(timeline_frame, 42), //
(timeline_yet_another, 1), //
]),
"entity_b".into(),
[&points as _, &colors as _],
Expand Down
18 changes: 9 additions & 9 deletions crates/re_data_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,13 @@ impl IndexedTable {
re_log::debug_once!("Failed to split bucket on timeline {}", timeline.name());

if 1 < config.indexed_bucket_num_rows
&& bucket_time_range.min == bucket_time_range.max
&& bucket_time_range.min() == bucket_time_range.max()
{
re_log::warn_once!(
"Found over {} rows with the same timepoint {:?}={} - perhaps you forgot to update or remove the timeline?",
config.indexed_bucket_num_rows,
bucket.timeline.name(),
bucket.timeline.typ().format_utc(bucket_time_range.min)
bucket.timeline.typ().format_utc(bucket_time_range.min())
);
}
}
Expand Down Expand Up @@ -454,7 +454,7 @@ impl IndexedBucket {
}

col_time.push_back(time.as_i64());
*time_range = TimeRange::new(time_range.min.min(time), time_range.max.max(time));
*time_range = TimeRange::new(time_range.min().min(time), time_range.max().max(time));
size_bytes_added += time.as_i64().total_size_bytes();

// update all control columns
Expand Down Expand Up @@ -649,7 +649,7 @@ impl IndexedBucket {
inner: RwLock::new(inner2),
};

(time_range2.min, bucket2)
(time_range2.min(), bucket2)
};

inner1.compute_size_bytes();
Expand Down Expand Up @@ -775,17 +775,17 @@ fn split_time_range_off(
times1: &[i64],
time_range1: &mut TimeRange,
) -> TimeRange {
let time_range2 = TimeRange::new(TimeInt::new_temporal(times1[split_idx]), time_range1.max);
let time_range2 = TimeRange::new(TimeInt::new_temporal(times1[split_idx]), time_range1.max());

// This can never fail (underflow or OOB) because we never split buckets smaller than 2
// entries.
time_range1.max = TimeInt::new_temporal(times1[split_idx - 1]);
time_range1.set_max(times1[split_idx - 1]);

debug_assert!(
time_range1.max.as_i64() < time_range2.min.as_i64(),
time_range1.max().as_i64() < time_range2.min().as_i64(),
"split resulted in overlapping time ranges: {} <-> {}\n{:#?}",
time_range1.max.as_i64(),
time_range2.min.as_i64(),
time_range1.max().as_i64(),
time_range2.min().as_i64(),
(&time_range1, &time_range2),
);

Expand Down
10 changes: 10 additions & 0 deletions crates/re_data_store/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ use crate::{DataStore, DataStoreConfig, WriteError};
#[doc(hidden)]
#[macro_export]
macro_rules! test_row {
($entity:ident => $n:expr; [$c0:expr $(,)*]) => {{
::re_log_types::DataRow::from_cells1_sized(
::re_log_types::RowId::new(),
$entity.clone(),
::re_log_types::TimePoint::timeless(),
$n,
$c0,
)
.unwrap()
}};
($entity:ident @ $frames:tt => $n:expr; [$c0:expr $(,)*]) => {{
::re_log_types::DataRow::from_cells1_sized(
::re_log_types::RowId::new(),
Expand Down
Loading

0 comments on commit bceb9bf

Please sign in to comment.