Skip to content

Commit

Permalink
Splat batches (#15)
Browse files Browse the repository at this point in the history
* Split out Data::Batch to a new enum, LoggedData

* Add "batch splatting"

* Optimize batch insertion

* Introduce IndexHash

* Refactor: move Index to own file

* struct Batch

* Optimize storage by using hashes instead of full indices and index paths

* Don't use three_d::Color::new

* Don't box IndexKey::index

* Fix crash

* Benchmark data insertion

* Remove hash from IndexPath

* Use IntMap for ObjTypePath

* Less use of IndexKey

* Remove `IndexKey`

* Small optimization

* Clippy fixes
  • Loading branch information
emilk authored Jun 16, 2022
1 parent 61d5a70 commit 7af9294
Show file tree
Hide file tree
Showing 24 changed files with 926 additions and 418 deletions.
165 changes: 129 additions & 36 deletions data_store/benches/data_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use criterion::{criterion_group, criterion_main, Criterion};

use data_store::TypePathDataStore;
use data_store::*;
use log_types::{FieldName, IndexKey, LogId};
use itertools::Itertools;
use log_types::{FieldName, LogId};

const NUM_FRAMES: i64 = 1_000; // this can have a big impact on performance
const NUM_POINTS_PER_CAMERA: u64 = 1_000;
Expand Down Expand Up @@ -55,14 +56,31 @@ fn obj_path(camera: &str, index: u64) -> ObjPath {
]))
}

fn type_path() -> ObjTypePath {
ObjTypePath::new(vec![
TypePathComp::String("camera".into()),
TypePathComp::Index,
TypePathComp::String("point".into()),
TypePathComp::Index,
])
}

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct Time(i64);

fn generate_date(individual_pos: bool, individual_radius: bool) -> TypePathDataStore<Time> {
fn generate_data(individual_pos: bool, individual_radius: bool) -> TypePathDataStore<Time> {
let mut data_store = TypePathDataStore::default();

let type_path = type_path();

let indices = (0..NUM_POINTS_PER_CAMERA)
.map(Index::Sequence)
.collect_vec();
let positions = vec![[1.0_f32; 3]; NUM_POINTS_PER_CAMERA as usize];
let radii = vec![1.0_f32; NUM_POINTS_PER_CAMERA as usize];

for frame in 0..NUM_FRAMES {
let time_value = Time(frame);
let time_value = Time(frame as _);
for camera in ["left", "right"] {
if individual_pos {
for point in 0..NUM_POINTS_PER_CAMERA {
Expand All @@ -77,28 +95,15 @@ fn generate_date(individual_pos: bool, individual_radius: bool) -> TypePathDataS
.unwrap();
}
} else {
let type_path = ObjTypePath::new(vec![
TypePathComp::String("camera".into()),
TypePathComp::Index,
TypePathComp::String("point".into()),
TypePathComp::Index,
]);
let mut index_path_prefix = IndexPath::default();
index_path_prefix.push(Index::String(camera.into()));
index_path_prefix.push(Index::Placeholder);

let batch = Arc::new(
(0..NUM_POINTS_PER_CAMERA)
.map(|pi| {
let pos: [f32; 3] = [1.0, 2.0, 3.0];
(IndexKey::new(Index::Sequence(pi)), pos)
})
.collect(),
);
let batch = Arc::new(Batch::new(&indices, &positions));

data_store
.insert_batch(
&ObjPath::new(type_path, index_path_prefix),
&ObjPath::new(type_path.clone(), index_path_prefix),
FieldName::from("pos"),
time_value,
LogId::random(),
Expand All @@ -120,25 +125,15 @@ fn generate_date(individual_pos: bool, individual_radius: bool) -> TypePathDataS
.unwrap();
}
} else {
let type_path = ObjTypePath::new(vec![
TypePathComp::String("camera".into()),
TypePathComp::Index,
TypePathComp::String("point".into()),
TypePathComp::Index,
]);
let mut index_path_prefix = IndexPath::default();
index_path_prefix.push(Index::String(camera.into()));
index_path_prefix.push(Index::Placeholder);

let batch = Arc::new(
(0..NUM_POINTS_PER_CAMERA)
.map(|pi| (IndexKey::new(Index::Sequence(pi)), 1.0_f32))
.collect(),
);
let batch = Arc::new(Batch::new(&indices, &radii));

data_store
.insert_batch(
&ObjPath::new(type_path, index_path_prefix),
&ObjPath::new(type_path.clone(), index_path_prefix),
FieldName::from("radius"),
time_value,
LogId::random(),
Expand All @@ -152,35 +147,127 @@ fn generate_date(individual_pos: bool, individual_radius: bool) -> TypePathDataS
data_store
}

pub fn criterion_benchmark(c: &mut Criterion) {
fn create_batch_thoughput(c: &mut Criterion) {
const NUM: usize = 100_000;
let indices = (0..NUM).map(|pi| Index::Sequence(pi as _)).collect_vec();
let positions = vec![[1.0_f32; 3]; NUM];

let mut group = c.benchmark_group("create-batch-throughput");
group.throughput(criterion::Throughput::Elements(NUM as _));

group.bench_function("Batch::new", |b| {
b.iter(|| Batch::new(&indices, &positions));
});

group.finish();
}

fn insert_batch_thoughput(c: &mut Criterion) {
const NUM_FRAMES: usize = 100;
const NUM_POINTS: usize = 10_000;
let indices = (0..NUM_POINTS)
.map(|pi| Index::Sequence(pi as _))
.collect_vec();
let positions = vec![[1.0_f32; 3]; NUM_POINTS];
let batch = std::sync::Arc::new(Batch::new(&indices, &positions));

let mut index_path_prefix = IndexPath::default();
index_path_prefix.push(Index::String("left".into()));
index_path_prefix.push(Index::Placeholder);

let mut group = c.benchmark_group("insert-batch-throughput");
group.throughput(criterion::Throughput::Elements(
(NUM_POINTS * NUM_FRAMES) as _,
));

group.bench_function("insert_batch", |b| {
b.iter(|| {
let mut data_store = TypePathDataStore::default();
for frame in 0..NUM_FRAMES {
let time_value = Time(frame as _);
data_store
.insert_batch(
&ObjPath::new(type_path(), index_path_prefix.clone()),
FieldName::from("pos"),
time_value,
LogId::random(),
batch.clone(),
)
.unwrap();
}
data_store
});
});

group.finish();
}

fn insert_individual_thoughput(c: &mut Criterion) {
const NUM_FRAMES: usize = 100;
const NUM_POINTS: usize = 1000;

let mut index_path_prefix = IndexPath::default();
index_path_prefix.push(Index::String("left".into()));
index_path_prefix.push(Index::Placeholder);

let mut group = c.benchmark_group("insert-individual-throughput");
group.throughput(criterion::Throughput::Elements(
(NUM_POINTS * NUM_FRAMES) as _,
));

group.bench_function("insert_individual", |b| {
b.iter(|| {
let mut data_store = TypePathDataStore::default();
for frame in 0..NUM_FRAMES {
let time_value = Time(frame as _);
for point in 0..NUM_POINTS {
data_store
.insert_individual::<[f32; 3]>(
obj_path("left", point as _),
FieldName::from("pos"),
time_value,
LogId::random(),
[1.0, 2.0, 3.0],
)
.unwrap();
}
}
data_store
});
});

group.finish();
}

fn query_throughput(c: &mut Criterion) {
let mut group = c.benchmark_group("query-points-throughput");
group.throughput(criterion::Throughput::Elements(TOTAL_POINTS as _));

let data_store = generate_date(false, false);
let data_store = generate_data(false, false);
group.bench_function("batched_pos_batched_radius", |b| {
b.iter(|| {
let points = points_from_store(&data_store, &TimeQuery::LatestAt(Time(NUM_FRAMES / 2)));
assert_eq!(points.len(), TOTAL_POINTS as usize);
});
});

let data_store = generate_date(true, true);
let data_store = generate_data(true, true);
group.bench_function("individual_pos_individual_radius", |b| {
b.iter(|| {
let points = points_from_store(&data_store, &TimeQuery::LatestAt(Time(NUM_FRAMES / 2)));
assert_eq!(points.len(), TOTAL_POINTS as usize);
});
});

let data_store = generate_date(false, true);
let data_store = generate_data(false, true);
group.bench_function("batched_pos_individual_radius", |b| {
b.iter(|| {
let points = points_from_store(&data_store, &TimeQuery::LatestAt(Time(NUM_FRAMES / 2)));
assert_eq!(points.len(), TOTAL_POINTS as usize);
});
});

let data_store = generate_date(true, false);
let data_store = generate_data(true, false);
group.bench_function("individual_pos_batched_radius", |b| {
b.iter(|| {
let points = points_from_store(&data_store, &TimeQuery::LatestAt(Time(NUM_FRAMES / 2)));
Expand All @@ -191,5 +278,11 @@ pub fn criterion_benchmark(c: &mut Criterion) {
group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_group!(
benches,
create_batch_thoughput,
insert_batch_thoughput,
insert_individual_thoughput,
query_throughput
);
criterion_main!(benches);
18 changes: 9 additions & 9 deletions data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ unsafe impl std::alloc::GlobalAlloc for TrackingAllocator {
}

use data_store::*;
use log_types::{IndexKey, LogId, TimeValue};
use itertools::Itertools;
use log_types::{LogId, TimeValue};

impl TrackingAllocator {
fn used_bytes(&self) -> usize {
Expand Down Expand Up @@ -142,19 +143,18 @@ fn big_clouds_batched() {
const NUM_FRAMES: usize = 100;
const NUM_POINTS_PER_CAMERA: usize = 1_000;

let indices = (0..NUM_POINTS_PER_CAMERA)
.map(|i| Index::Sequence(i as _))
.collect_vec();
let point: [f32; 3] = [1.0, 2.0, 3.0];
let positions = vec![point; NUM_POINTS_PER_CAMERA];

let mut store = TypePathDataStore::default();
let mut frame = 0;
let mut num_points = 0;
while frame < NUM_FRAMES {
for camera in 0..NUM_CAMERAS {
let batch = std::sync::Arc::new(
(0..NUM_POINTS_PER_CAMERA)
.map(|i| {
let point: [f32; 3] = [1.0, 2.0, 3.0];
(IndexKey::new(Index::Sequence(i as _)), point)
})
.collect(),
);
let batch = std::sync::Arc::new(Batch::new(&indices, &positions));
let (obj_type_path, index_path) =
obj_path(camera as _, 0).into_type_path_and_index_path();
let (index_path_prefix, _) = index_path.replace_last_with_placeholder();
Expand Down
Loading

0 comments on commit 7af9294

Please sign in to comment.