Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batching 3: DataRow & DataTable + no bundles outside of transport #1673

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, Time
use re_log_types::{
component_types::{InstanceKey, Rect2D},
datagen::{build_frame_nr, build_some_instances, build_some_rects},
msg_bundle::{try_build_msg_bundle2, MsgBundle},
Component as _, ComponentName, EntityPath, MsgId, TimeType, Timeline,
Component as _, ComponentName, DataRow, EntityPath, MsgId, TimeType, Timeline,
};

// ---
Expand All @@ -27,28 +26,30 @@ const NUM_RECTS: i64 = 1;

// --- Benchmarks ---

// TODO(cmc): need additional benches for full tables

fn insert(c: &mut Criterion) {
{
let msgs = build_messages(NUM_RECTS as usize);
let rows = build_rows(NUM_RECTS as usize);
let mut group = c.benchmark_group("datastore/insert/batch/rects");
group.throughput(criterion::Throughput::Elements(
(NUM_RECTS * NUM_FRAMES) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_messages(Default::default(), InstanceKey::name(), msgs.iter()));
b.iter(|| insert_rows(Default::default(), InstanceKey::name(), rows.iter()));
});
}
}

fn latest_at_batch(c: &mut Criterion) {
{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(Default::default(), InstanceKey::name(), msgs.iter());
let rows = build_rows(NUM_RECTS as usize);
let store = insert_rows(Default::default(), InstanceKey::name(), rows.iter());
let mut group = c.benchmark_group("datastore/latest_at/batch/rects");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("query", |b| {
b.iter(|| {
let results = latest_messages_at(&store, Rect2D::name(), &[Rect2D::name()]);
let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = results[0]
.as_ref()
.unwrap()
Expand All @@ -70,27 +71,27 @@ fn latest_at_missing_components(c: &mut Criterion) {
};

{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(config.clone(), InstanceKey::name(), msgs.iter());
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(config.clone(), InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/latest_at/missing_components");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("primary", |b| {
b.iter(|| {
let results =
latest_messages_at(&store, "non_existing_component".into(), &[Rect2D::name()]);
latest_data_at(&store, "non_existing_component".into(), &[Rect2D::name()]);
assert!(results[0].is_none());
});
});
}

{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(config, InstanceKey::name(), msgs.iter());
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(config, InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/latest_at/missing_components");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("secondaries", |b| {
b.iter(|| {
let results = latest_messages_at(
let results = latest_data_at(
&store,
Rect2D::name(),
&[
Expand All @@ -109,15 +110,15 @@ fn latest_at_missing_components(c: &mut Criterion) {

fn range_batch(c: &mut Criterion) {
{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(Default::default(), InstanceKey::name(), msgs.iter());
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(Default::default(), InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/range/batch/rects");
group.throughput(criterion::Throughput::Elements(
(NUM_RECTS * NUM_FRAMES) as _,
));
group.bench_function("query", |b| {
b.iter(|| {
let msgs = range_messages(&store, [Rect2D::name()]);
let msgs = range_data(&store, [Rect2D::name()]);
for (cur_time, (time, results)) in msgs.enumerate() {
let time = time.unwrap();
assert_eq!(cur_time as i64, time.as_i64());
Expand Down Expand Up @@ -146,31 +147,31 @@ criterion_main!(benches);

// --- Helpers ---

fn build_messages(n: usize) -> Vec<MsgBundle> {
fn build_rows(n: usize) -> Vec<DataRow> {
(0..NUM_FRAMES)
.map(move |frame_idx| {
try_build_msg_bundle2(
MsgId::ZERO,
DataRow::from_cells2(
MsgId::random(),
"rects",
[build_frame_nr(frame_idx.into())],
n as _,
(build_some_instances(n), build_some_rects(n)),
)
.unwrap()
})
.collect()
}

fn insert_messages<'a>(
fn insert_rows<'a>(
config: DataStoreConfig,
cluster_key: ComponentName,
msgs: impl Iterator<Item = &'a MsgBundle>,
rows: impl Iterator<Item = &'a DataRow>,
) -> DataStore {
let mut store = DataStore::new(cluster_key, config);
msgs.for_each(|msg_bundle| store.insert_row(msg_bundle).unwrap());
rows.for_each(|row| store.insert_row(row).unwrap());
store
}

fn latest_messages_at<const N: usize>(
fn latest_data_at<const N: usize>(
store: &DataStore,
primary: ComponentName,
secondaries: &[ComponentName; N],
Expand All @@ -185,7 +186,7 @@ fn latest_messages_at<const N: usize>(
store.get(secondaries, &row_indices)
}

fn range_messages<const N: usize>(
fn range_data<const N: usize>(
store: &DataStore,
components: [ComponentName; N],
) -> impl Iterator<Item = (Option<TimeInt>, [Option<Box<dyn Array>>; N])> + '_ {
Expand Down
67 changes: 33 additions & 34 deletions crates/re_arrow_store/examples/dump_dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example dump_dataframe
//! ```

use re_arrow_store::{test_bundle, DataStore};
use re_arrow_store::{test_row, DataStore};
use re_log_types::{
component_types::InstanceKey,
datagen::{
Expand All @@ -25,51 +25,50 @@ fn main() {
];

for ent_path in &ent_paths {
let bundle1 = test_bundle!(ent_path @ [
build_frame_nr(1.into()), build_log_time(Time::now()),
] => [build_some_instances(2), build_some_rects(2)]);
store.insert_row(&bundle1).unwrap();
let row1 = test_row!(ent_path @ [
build_frame_nr(1.into()), build_log_time(Time::now()),
] => 2; [build_some_instances(2), build_some_rects(2)]);
store.insert_row(&row1).unwrap();
}

for ent_path in &ent_paths {
let bundle2 = test_bundle!(ent_path @ [
build_frame_nr(2.into())
] => [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&bundle2).unwrap();
let row2 = test_row!(ent_path @ [
build_frame_nr(2.into())
] => 2; [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&row2).unwrap();
// Insert timelessly too!
let bundle2 =
test_bundle!(ent_path @ [] => [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&bundle2).unwrap();
let row2 = test_row!(ent_path @ [] => 2; [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&row2).unwrap();

let bundle3 = test_bundle!(ent_path @ [
build_frame_nr(3.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&bundle3).unwrap();
let row3 = test_row!(ent_path @ [
build_frame_nr(3.into()), build_log_time(Time::now()),
] => 4; [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&row3).unwrap();
// Insert timelessly too!
let bundle3 = test_bundle!(ent_path @ [] => [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&bundle3).unwrap();
let row3 = test_row!(ent_path @ [] => 4; [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&row3).unwrap();
}

for ent_path in &ent_paths {
let bundle4_1 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(20..23), build_some_rects(3)]);
store.insert_row(&bundle4_1).unwrap();
let row4_1 = test_row!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => 3; [build_some_instances_from(20..23), build_some_rects(3)]);
store.insert_row(&row4_1).unwrap();

let bundle4_15 = test_bundle!(ent_path @ [
build_frame_nr(4.into()),
] => [build_some_instances_from(20..23), build_some_point2d(3)]);
store.insert_row(&bundle4_15).unwrap();
let row4_15 = test_row!(ent_path @ [
build_frame_nr(4.into()),
] => 3; [build_some_instances_from(20..23), build_some_point2d(3)]);
store.insert_row(&row4_15).unwrap();

let bundle4_2 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..28), build_some_rects(3)]);
store.insert_row(&bundle4_2).unwrap();
let row4_2 = test_row!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => 3; [build_some_instances_from(25..28), build_some_rects(3)]);
store.insert_row(&row4_2).unwrap();

let bundle4_25 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..28), build_some_point2d(3)]);
store.insert_row(&bundle4_25).unwrap();
let row4_25 = test_row!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => 3; [build_some_instances_from(25..28), build_some_point2d(3)]);
store.insert_row(&row4_25).unwrap();
}

let df = store.to_dataframe();
Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/examples/latest_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! ```

use re_arrow_store::polars_util::latest_component;
use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline};
use re_arrow_store::{test_row, DataStore, LatestAtQuery, TimeType, Timeline};
use re_log_types::component_types::Rect2D;
use re_log_types::datagen::build_some_rects;
use re_log_types::{
Expand All @@ -19,11 +19,11 @@ fn main() {

let ent_path = EntityPath::from("my/entity");

let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(2.into())] => 4; [build_some_rects(4)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(3.into())] => 2; [build_some_point2d(2)]);
store.insert_row(&row).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);

Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/examples/latest_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use polars_core::prelude::*;
use re_arrow_store::polars_util::latest_components;
use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline};
use re_arrow_store::{test_row, DataStore, LatestAtQuery, TimeType, Timeline};
use re_log_types::{
component_types::{InstanceKey, Point2D, Rect2D},
datagen::{build_frame_nr, build_some_point2d, build_some_rects},
Expand All @@ -18,11 +18,11 @@ fn main() {

let ent_path = EntityPath::from("my/entity");

let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(2.into())] => 4; [build_some_rects(4)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(3.into())] => 2; [build_some_point2d(2)]);
store.insert_row(&row).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let df = latest_components(
Expand Down
30 changes: 15 additions & 15 deletions crates/re_arrow_store/examples/range_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! ```

use polars_core::prelude::JoinType;
use re_arrow_store::{polars_util, test_bundle, DataStore, RangeQuery, TimeRange};
use re_arrow_store::{polars_util, test_row, DataStore, RangeQuery, TimeRange};
use re_log_types::{
component_types::{InstanceKey, Point2D, Rect2D},
datagen::{build_frame_nr, build_some_point2d, build_some_rects},
Expand All @@ -22,26 +22,26 @@ fn main() {
let frame3 = 3.into();
let frame4 = 4.into();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame1)] => 2; [build_some_rects(2)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_point2d(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame2)] => 2; [build_some_point2d(2)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(4)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame3)] => 4; [build_some_point2d(4)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_rects(3)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(1)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 1; [build_some_point2d(1)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_rects(3)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(3)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_point2d(3)]);
store.insert_row(&row).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(2.into(), 4.into()));
Expand Down
2 changes: 2 additions & 0 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::{
PersistentIndexTable, RowIndex,
};

// TODO(#1692): all of this stuff should be defined by Data{Cell,Row,Table}, not the store.

// ---

impl DataStore {
Expand Down
Loading