Skip to content

Commit

Permalink
arrow datastore: read+write query path MVP (#428)
Browse files Browse the repository at this point in the history
* object path => entity path

* move utils from lib.rs to dedicated file

* color_rgba -> color_srgba_unmultiplied

* getting intimate with arrow's datamodel

* getting _even more_ intimate with arrow's datamodel

* split it

* building dem index keys

* disgustingly, incorrectly inserting components all the way down

* timelines need no list

* similarly clarifying the nested listing situation, on the components side this time

* make sure it looks like it should!

* actual integration tests

* bootstrapping text-based debugging

* bootstrapping indices

* introducing TypedTimeInt everywhere

* full index sorting

* auto-inserting empty component lists in starting buckets

* better datagen tools

* bidirectional merges for indices + properly showing NULLs in dataframes

* finally can show off some more advanced ingestion patterns!

* dealing with corrupt validity bitmaps, and the sheer size of my stupidity

* read path taking its first steps: latest_at for indices!

* look! it's a read path!

* it works!

* show the resulting dataframe duh

* clean up pass #1: task log

* clean up pass #2: moving everybody where they belong

* clean up pass #3: definitions

* a minimal solution for missing components

* some more cleanup

* porting relevant TODOs into issues

* appeasing the CI deities

* merge catastrophe

* they see me cleanin', they hatin'

* * Reorg of re_arrow_store
* Removed up old ArrowDB code
* Connected app data ingest into new DataStore

* fix broken doc links

* store files prefixed with store_

* integration tests in integration folder + exposing datagen tools to everyone

* make integration tests scale to more complex scenarios

* adding currently failing scenario: query before any data present

* added failing test and scenarios for all emptines-related edge cases

* better testing tools

* fixing broken edge cases on read path

* demonstrating faulty read behavior in roundtrip test

* fixing dem faulty swaps

* when the doc itself demonstrates bugs :x

* adding baseline bench somewhat mimicking the legacy ones, though it doesn't really make sense anymore

* exploding query results so you can actually do stuff with them

* properly testing all halfway frames (and, unsurprisingly, failing!)

* properly dealing with multi-row primary indices

* less verbose scenarios for end-to-lend latest_at tests

* addressing misc PR comments

* TimeReal, TimeRange & TimeRangeF are now a properly of re_log_types™

* retiring TypedTimeRange before Emil tries to hurt it

* mark unreachable as such

* replaced binary_search with a partition_point

* using entity path hashes directly in indexing datastructures

* re_viewer don't need those no more

Co-authored-by: John Hughes <jondo2010@gmail.com>
Co-authored-by: Emil Ernerfeldt <emil.ernerfeldt@gmail.com>
  • Loading branch information
3 people committed Dec 5, 2022
1 parent c4c0c7d commit bd69add
Show file tree
Hide file tree
Showing 27 changed files with 1,856 additions and 457 deletions.
18 changes: 16 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 35 additions & 3 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,51 @@ rust-version.workspace = true
license.workspace = true
publish = false

[features]
## Enables the `datagen` module, which exposes a number of tools for generating random data for
## tests and benchmarks.
default = []
datagen = ["dep:rand"]

[dependencies]
# Rerun
re_log = { path = "../re_log" }
re_log_types = { path = "../re_log_types" }

arrow2 = { version = "0.14", features = ["io_ipc"] }
# External
anyhow = "1.0"
arrow2 = { version = "0.14", features = ["io_ipc", "compute_concatenate"] }
arrow2_convert = { version = "0.3" }
chrono = "0.4"
document-features = "0.2"
indent = "0.1"
nohash-hasher = "0.2"
thiserror = "1.0"

# External (optional)
rand = { version = "0.8.5", optional = true }

# native dependencies:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
polars = { version = "0.25", features = [
"dtype-time",
"dtype-struct",
"fmt",
], default-features = false }

# web dependencies:
[target.'cfg(target_arch = "wasm32")'.dependencies]
polars = { version = "0.25", features = [
"dtype-time",
"dtype-struct",
], default-features = false }
thiserror = "1.0"

[dev-dependencies]
polars = { version = "0.25", features = ["fmt"], default-features = false }
criterion = "0.3"
itertools = "0.10"
mimalloc = "0.1"
tracing-subscriber = "0.3"

[[bench]]
name = "data_store"
harness = false
88 changes: 88 additions & 0 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};
use criterion::{criterion_group, criterion_main, Criterion};
use polars::prelude::DataFrame;

use re_arrow_store::{datagen::*, DataStore, TimeQuery};
use re_log_types::{ObjPath as EntityPath, TimeType, Timeline};

// ---

#[cfg(not(debug_assertions))]
const NUM_FRAMES: i64 = 100;
#[cfg(not(debug_assertions))]
const NUM_RECTS: i64 = 100;

// `cargo test` also runs the benchmark setup code, so make sure they run quickly:
#[cfg(debug_assertions)]
const NUM_FRAMES: i64 = 1;
#[cfg(debug_assertions)]
const NUM_RECTS: i64 = 1;

// --- Benchmarks ---

fn batch_rects(c: &mut Criterion) {
let msgs = build_messages(NUM_RECTS as usize);

{
let mut group = c.benchmark_group("datastore/batch/rects");
group.throughput(criterion::Throughput::Elements(
(NUM_RECTS * NUM_FRAMES) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_messages(&msgs));
});
}

{
let mut group = c.benchmark_group("datastore/batch/rects");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
let mut store = insert_messages(&msgs);
group.bench_function("query", |b| {
b.iter(|| query_messages(&mut store));
});
}
}

criterion_group!(benches, batch_rects);
criterion_main!(benches);

// --- Helpers ---

fn build_messages(n: usize) -> Vec<(Schema, Chunk<Box<dyn Array>>)> {
let ent_path = EntityPath::from("rects");

(0..NUM_FRAMES)
.into_iter()
.map(|frame_idx| {
build_message(
&ent_path,
[build_frame_nr(frame_idx)],
[build_positions(n), build_rects(n)],
)
})
.collect()
}

fn insert_messages(msgs: &[(Schema, Chunk<Box<dyn Array>>)]) -> DataStore {
let mut store = DataStore::default();
for (schema, data) in msgs {
store.insert(schema, data).unwrap();
}
store
}

fn query_messages(store: &mut DataStore) -> DataFrame {
let time_query = TimeQuery::LatestAt(NUM_FRAMES / 2);
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let ent_path = EntityPath::from("rects");

let df = store
.query(&timeline_frame_nr, &time_query, &ent_path, &["rects"])
.unwrap();
assert_eq!(NUM_RECTS as usize, df.select_at_idx(0).unwrap().len());

df
}
Loading

0 comments on commit bd69add

Please sign in to comment.