Skip to content

Commit

Permalink
Merge branch 'main' into grtlr/re-uri
Browse files Browse the repository at this point in the history
  • Loading branch information
grtlr authored Feb 19, 2025
2 parents 872a836 + 2aa3014 commit 3ad1e1d
Show file tree
Hide file tree
Showing 60 changed files with 2,029 additions and 324 deletions.
48 changes: 36 additions & 12 deletions crates/build/re_types_builder/src/codegen/python/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2488,6 +2488,17 @@ fn quote_clear_methods(obj: &Object) -> String {
))
}

fn quote_kwargs(obj: &Object) -> String {
obj.fields
.iter()
.map(|field| {
let field_name = field.snake_case_name();
format!("'{field_name}': {field_name}")
})
.collect_vec()
.join(",\n")
}

fn quote_partial_update_methods(reporter: &Reporter, obj: &Object, objects: &Objects) -> String {
let name = &obj.name;

Expand All @@ -2503,15 +2514,7 @@ fn quote_partial_update_methods(reporter: &Reporter, obj: &Object, objects: &Obj
.join(",\n");
let parameters = indent::indent_by(8, parameters);

let kwargs = obj
.fields
.iter()
.map(|field| {
let field_name = field.snake_case_name();
format!("'{field_name}': {field_name}")
})
.collect_vec()
.join(",\n");
let kwargs = quote_kwargs(obj);
let kwargs = indent::indent_by(12, kwargs);

let parameter_docs = compute_init_parameter_docs(reporter, obj, objects);
Expand Down Expand Up @@ -2611,6 +2614,9 @@ fn quote_columnar_methods(reporter: &Reporter, obj: &Object, objects: &Objects)
};
let doc_block = indent::indent_by(12, quote_doc_lines(doc_string_lines));

let kwargs = quote_kwargs(obj);
let kwargs = indent::indent_by(12, kwargs);

// NOTE: Calling `update_fields` is not an option: we need to be able to pass
// plural data, even to singular fields (mono-components).
unindent(&format!(
Expand All @@ -2632,11 +2638,29 @@ fn quote_columnar_methods(reporter: &Reporter, obj: &Object, objects: &Objects)
if len(batches) == 0:
return ComponentColumnList([])
lengths = np.ones(len(batches[0]._batch.as_arrow_array()))
columns = [batch.partition(lengths) for batch in batches]
kwargs = {{
{kwargs}
}}
columns = []
for batch in batches:
arrow_array = batch.as_arrow_array()
# For primitive arrays, we infer partition size from the input shape.
if pa.types.is_primitive(arrow_array.type):
param = kwargs[batch.component_descriptor().archetype_field_name] # type: ignore[index]
shape = np.shape(param) # type: ignore[arg-type]
batch_length = shape[1] if len(shape) > 1 else 1
num_rows = shape[0] if len(shape) >= 1 else 1
sizes = batch_length * np.ones(num_rows)
else:
# For non-primitive types, default to partitioning each element separately.
sizes = np.ones(len(arrow_array))
indicator_column = cls.indicator().partition(np.zeros(len(lengths)))
columns.append(batch.partition(sizes))
indicator_column = cls.indicator().partition(np.zeros(len(sizes)))
return ComponentColumnList([indicator_column] + columns)
"#
))
Expand Down
16 changes: 8 additions & 8 deletions crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ impl Chunk {
} = info;

let array = info.times_array();
let schema = re_sorbet::IndexColumnDescriptor {
timeline: *timeline,
datatype: array.data_type().clone(),
is_sorted: *is_sorted,
};

debug_assert_eq!(&timeline.datatype(), array.data_type());

let schema =
re_sorbet::IndexColumnDescriptor::from_timeline(*timeline, *is_sorted);

(schema, into_arrow_ref(array))
})
Expand Down Expand Up @@ -165,17 +165,17 @@ impl Chunk {
let times =
TimeColumn::read_array(&as_array_ref(column.clone())).map_err(|err| {
ChunkError::Malformed {
reason: format!("Bad time column '{}': {err}", schema.name()),
reason: format!("Bad time column '{}': {err}", schema.column_name()),
}
})?;

let time_column =
TimeColumn::new(schema.is_sorted.then_some(true), timeline, times);
TimeColumn::new(schema.is_sorted().then_some(true), timeline, times);
if timelines.insert(timeline, time_column).is_some() {
return Err(ChunkError::Malformed {
reason: format!(
"time column '{}' was specified more than once",
schema.name(),
timeline.name()
),
});
}
Expand Down
36 changes: 35 additions & 1 deletion crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,45 @@ pub struct TimeColumnSelector {
pub timeline: TimelineName,
}

impl From<TimelineName> for TimeColumnSelector {
#[inline]
fn from(timeline: TimelineName) -> Self {
Self { timeline }
}
}

impl From<Timeline> for TimeColumnSelector {
#[inline]
fn from(timeline: Timeline) -> Self {
Self {
timeline: *timeline.name(),
}
}
}

impl From<&str> for TimeColumnSelector {
#[inline]
fn from(timeline: &str) -> Self {
Self {
timeline: timeline.into(),
}
}
}

impl From<String> for TimeColumnSelector {
#[inline]
fn from(timeline: String) -> Self {
Self {
timeline: timeline.into(),
}
}
}

impl From<IndexColumnDescriptor> for TimeColumnSelector {
#[inline]
fn from(desc: IndexColumnDescriptor) -> Self {
Self {
timeline: *desc.timeline.name(),
timeline: *desc.timeline().name(),
}
}
}
Expand Down
162 changes: 120 additions & 42 deletions crates/store/re_chunk_store/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::BTreeSet, sync::Arc};

use itertools::Itertools;
use itertools::{Either, Itertools};
use nohash_hasher::IntSet;

use re_chunk::{Chunk, LatestAtQuery, RangeQuery};
Expand Down Expand Up @@ -587,40 +587,78 @@ impl ChunkStore {
chunks
}

/// Returns the most-relevant _temporal_ chunk(s) for the given [`LatestAtQuery`].
/// Returns the most-relevant chunk(s) for the given [`LatestAtQuery`].
///
/// The returned vector is guaranteed free of duplicates, by definition.
/// Optionally include static data.
///
/// The [`ChunkStore`] always work at the [`Chunk`] level (as opposed to the row level): it is
/// oblivious to the data therein.
/// For that reason, and because [`Chunk`]s are allowed to temporally overlap, it is possible
/// that a query has more than one relevant chunk.
///
/// The returned vector is free of duplicates.
///
/// The caller should filter the returned chunks further (see [`Chunk::latest_at`]) in order to
/// determine what exact row contains the final result.
///
/// **This ignores static data.**
pub fn latest_at_relevant_chunks_for_all_components(
&self,
query: &LatestAtQuery,
entity_path: &EntityPath,
include_static: bool,
) -> Vec<Arc<Chunk>> {
re_tracing::profile_function!(format!("{query:?}"));

let chunks = self
.temporal_chunk_ids_per_entity
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline| {
temporal_chunk_ids_per_timeline.get(&query.timeline())
})
.and_then(|temporal_chunk_ids_per_time| {
self.latest_at(query, temporal_chunk_ids_per_time)
})
.unwrap_or_default();

debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());

chunks
if include_static {
let empty = Default::default();
let static_chunks_per_component = self
.static_chunk_ids_per_entity
.get(entity_path)
.unwrap_or(&empty);

// All static chunks for the given entity
let static_chunks = static_chunks_per_component
.values()
.filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
.cloned();

// All temporal chunks for the given entity, filtered by components
// for which we already have static chunks.
let temporal_chunks = self
.temporal_chunk_ids_per_entity_per_component
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline_per_component| {
temporal_chunk_ids_per_timeline_per_component.get(&query.timeline())
})
.map(|temporal_chunk_ids_per_component| {
temporal_chunk_ids_per_component
.iter()
.filter(|(component_name, _)| {
!static_chunks_per_component.contains_key(component_name)
})
.map(|(_, chunk_id_set)| chunk_id_set)
})
.into_iter()
.flatten()
.filter_map(|temporal_chunk_ids_per_time| {
self.latest_at(query, temporal_chunk_ids_per_time)
})
.flatten()
// The latest_at queries may yield duplicate chunks, and it's unlikely
// anyone will use this without also deduplicating first.
.unique_by(|chunk| chunk.id());

static_chunks.chain(temporal_chunks).collect_vec()
} else {
self.temporal_chunk_ids_per_entity
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline| {
temporal_chunk_ids_per_timeline.get(&query.timeline())
})
.and_then(|temporal_chunk_ids_per_time| {
self.latest_at(query, temporal_chunk_ids_per_time)
})
.unwrap_or_default()
}
}

fn latest_at(
Expand Down Expand Up @@ -743,49 +781,89 @@ impl ChunkStore {
chunks
}

/// Returns the most-relevant _temporal_ chunk(s) for the given [`RangeQuery`].
///
/// The returned vector is guaranteed free of duplicates, by definition.
/// Returns the most-relevant chunk(s) for the given [`RangeQuery`].
///
/// The criteria for returning a chunk is only that it may contain data that overlaps with
/// the queried range.
/// the queried range, or that it is static.
///
/// The returned vector is free of duplicates.
///
/// The caller should filter the returned chunks further (see [`Chunk::range`]) in order to
/// determine how exactly each row of data fit with the rest.
///
/// **This ignores static data.**
pub fn range_relevant_chunks_for_all_components(
&self,
query: &RangeQuery,
entity_path: &EntityPath,
include_static: bool,
) -> Vec<Arc<Chunk>> {
re_tracing::profile_function!(format!("{query:?}"));

let chunks = self
.range(
query,
self.temporal_chunk_ids_per_entity
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline| {
temporal_chunk_ids_per_timeline.get(&query.timeline())
})
.into_iter(),
let empty = Default::default();
let chunks = if include_static {
let static_chunks_per_component = self
.static_chunk_ids_per_entity
.get(entity_path)
.unwrap_or(&empty);

// All static chunks for the given entity
let static_chunks = static_chunks_per_component
.values()
.filter_map(|chunk_id| self.chunks_per_chunk_id.get(chunk_id))
.cloned();

// All temporal chunks for the given entity, filtered by components
// for which we already have static chunks.
let temporal_chunks = self
.range(
query,
self.temporal_chunk_ids_per_entity_per_component
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline_per_component| {
temporal_chunk_ids_per_timeline_per_component.get(&query.timeline())
})
.map(|temporal_chunk_ids_per_component| {
temporal_chunk_ids_per_component
.iter()
.filter(|(component_name, _)| {
!static_chunks_per_component.contains_key(component_name)
})
.map(|(_, chunk_id_set)| chunk_id_set)
})
.into_iter()
.flatten(),
)
.into_iter()
// The range query may yield duplicate chunks, and it's unlikely
// anyone will use this without deduplicating first.
.unique_by(|chunk| chunk.id());

Either::Left(static_chunks.chain(temporal_chunks))
} else {
Either::Right(
self.range(
query,
self.temporal_chunk_ids_per_entity
.get(entity_path)
.and_then(|temporal_chunk_ids_per_timeline| {
temporal_chunk_ids_per_timeline.get(&query.timeline())
})
.into_iter(),
),
)
};

// Post-processing: `Self::range` doesn't have access to the chunk metadata, so now we
// need to make sure that the resulting chunks' global time ranges intersect with the
// time range of the query itself.
chunks
.into_iter()
// Post-processing: `Self::range` doesn't have access to the chunk metadata, so now we
// need to make sure that the resulting chunks' global time ranges intersect with the
// time range of the query itself.
.filter(|chunk| {
chunk
.timelines()
.get(&query.timeline())
.is_some_and(|time_column| time_column.time_range().intersects(query.range()))
})
.collect_vec();

debug_assert!(chunks.iter().map(|chunk| chunk.id()).all_unique());

chunks
.collect_vec()
}

fn range<'a>(
Expand Down
Loading

0 comments on commit 3ad1e1d

Please sign in to comment.