-
Notifications
You must be signed in to change notification settings - Fork 222
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
fc7d78c
commit 44587fd
Showing
6 changed files
with
383 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,4 +2,5 @@ | |
// SPDX-FileCopyrightText: Copyright The Lance Authors | ||
|
||
pub mod datagen; | ||
pub mod object_store; | ||
pub mod util; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
use std::{ | ||
fmt::{Debug, Display}, | ||
ops::Range, | ||
sync::Arc, | ||
time::Duration, | ||
}; | ||
|
||
use futures::stream::BoxStream; | ||
use lance_io::object_store::WrappingObjectStore; | ||
use object_store::{ | ||
path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, | ||
ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, | ||
}; | ||
use rand::prelude::Distribution; | ||
|
||
///! A trait that determines how long to sleep for a given path. | ||
pub trait SleepDeterminer: Display + Debug + Clone + Send + Sync + 'static { | ||
fn sleep(&self, path: &Path, range: Option<GetRange>) -> Duration; | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct NoSleep; | ||
|
||
impl Display for NoSleep { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "NoSleep") | ||
} | ||
} | ||
|
||
impl SleepDeterminer for NoSleep { | ||
fn sleep(&self, _path: &Path, _range: Option<GetRange>) -> Duration { | ||
Duration::ZERO | ||
} | ||
} | ||
|
||
///! A sleep determiner that sleeps for a random amount of time based on a gassuian distribution. | ||
///! the arguments are the mean and standard deviation of the distribution, in seconds. | ||
#[derive(Debug, Clone)] | ||
pub struct GassuianSleepGenerator(pub f32, pub f32); | ||
|
||
impl Display for GassuianSleepGenerator { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "GassuianSleepGenerator({}, {})", self.0, self.1) | ||
} | ||
} | ||
|
||
impl SleepDeterminer for GassuianSleepGenerator { | ||
fn sleep(&self, _path: &Path, _range: Option<GetRange>) -> Duration { | ||
let normal = rand::distributions::Standard; | ||
let mut rng = rand::thread_rng(); | ||
let sleep_duration: f32 = normal.sample(&mut rng); | ||
let sleep_duration = sleep_duration * self.1 + self.0; | ||
if sleep_duration < 0.0 { | ||
Duration::ZERO | ||
} else { | ||
Duration::from_secs_f32(sleep_duration) | ||
} | ||
} | ||
} | ||
|
||
///! A sleep determiner that sleeps if a coin flip is heads (can be unfair) | ||
///! the argument is the probability of sleeping. | ||
#[derive(Debug, Clone)] | ||
pub struct MaybeSleepByChance<S: SleepDeterminer>(pub f32, pub S); | ||
|
||
impl<S: SleepDeterminer> Display for MaybeSleepByChance<S> { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "MaybeSleepByChance({}, {})", self.0, self.1) | ||
} | ||
} | ||
|
||
impl<S: SleepDeterminer> SleepDeterminer for MaybeSleepByChance<S> { | ||
fn sleep(&self, _path: &Path, _range: Option<GetRange>) -> Duration { | ||
let normal = rand::distributions::Uniform::new(0.0, 1.0); | ||
let mut rng = rand::thread_rng(); | ||
let sleep: f32 = normal.sample(&mut rng); | ||
if sleep < self.0 { | ||
self.1.sleep(_path, _range) | ||
} else { | ||
Duration::ZERO | ||
} | ||
} | ||
} | ||
|
||
///! An object store that simulates slow operations. | ||
///! This is useful for testing the behavior of the system under slow object store conditions. | ||
///! The sleep duration is determined by the `sleep_hook` function. | ||
///! | ||
///! `sleep_hook` may also decide to not sleep at all by returning `Duration::ZERO`. | ||
///! this is useful for ablation studies. | ||
#[derive(Debug)] | ||
pub struct SleepyObjectStore<S: SleepDeterminer> { | ||
inner: Arc<dyn ObjectStore>, | ||
|
||
sleep_hook: S, | ||
} | ||
|
||
impl<S: SleepDeterminer> Display for SleepyObjectStore<S> { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
write!( | ||
f, | ||
"SleepyObjectStore(inner: {}, sleep_hook: {}", | ||
self.inner, self.sleep_hook | ||
) | ||
} | ||
} | ||
|
||
impl<S: SleepDeterminer> SleepyObjectStore<S> { | ||
pub fn new(inner: Arc<dyn ObjectStore>, sleep_hook: S) -> Self { | ||
Self { inner, sleep_hook } | ||
} | ||
|
||
async fn do_sleep(&self, path: &Path, range: Option<GetRange>) { | ||
let sleep_duration = self.sleep_hook.sleep(path, range); | ||
if sleep_duration != Duration::ZERO { | ||
tokio::time::sleep(sleep_duration).await; | ||
} | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl<S: SleepDeterminer> ObjectStore for SleepyObjectStore<S> { | ||
async fn put_opts( | ||
&self, | ||
location: &Path, | ||
payload: PutPayload, | ||
opts: PutOptions, | ||
) -> Result<PutResult> { | ||
self.inner.put_opts(location, payload, opts).await | ||
} | ||
|
||
async fn put_multipart_opts( | ||
&self, | ||
location: &Path, | ||
opts: PutMultipartOpts, | ||
) -> Result<Box<dyn MultipartUpload>> { | ||
// just don't sleep here for now, we need a custom stream impl to do this | ||
self.inner.put_multipart_opts(location, opts).await | ||
} | ||
|
||
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> { | ||
self.do_sleep(location, options.range.clone()).await; | ||
self.inner.get_opts(location, options).await | ||
} | ||
|
||
async fn delete(&self, location: &Path) -> Result<()> { | ||
self.inner.delete(location).await | ||
} | ||
|
||
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> { | ||
self.inner.list(prefix) | ||
} | ||
|
||
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> { | ||
self.inner.list_with_delimiter(prefix).await | ||
} | ||
|
||
async fn copy(&self, from: &Path, to: &Path) -> Result<()> { | ||
self.inner.copy(from, to).await | ||
} | ||
|
||
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { | ||
self.inner.copy_if_not_exists(from, to).await | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
struct SleepyObjectStoreWrapper<S: SleepDeterminer>(pub S); | ||
|
||
impl<S: SleepDeterminer> WrappingObjectStore for SleepyObjectStoreWrapper<S> { | ||
fn wrap(&self, original: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> { | ||
Arc::new(SleepyObjectStore::new(original, self.0.clone())) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -129,3 +129,7 @@ harness = false | |
[[bench]] | ||
name = "ivf_pq" | ||
harness = false | ||
|
||
[[bench]] | ||
name = "take" | ||
harness = false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// SPDX-FileCopyrightText: Copyright The Lance Authors | ||
|
||
use arrow_array::{ | ||
BinaryArray, FixedSizeListArray, Float32Array, Int32Array, RecordBatch, RecordBatchIterator, | ||
}; | ||
use arrow_schema::{DataType, Field, FieldRef, Schema as ArrowSchema}; | ||
use criterion::{criterion_group, criterion_main, Criterion}; | ||
|
||
use lance::{ | ||
arrow::FixedSizeListArrayExt, | ||
dataset::{builder::DatasetBuilder, ProjectionRequest}, | ||
}; | ||
use lance_table::io::commit::RenameCommitHandler; | ||
use object_store::ObjectStore; | ||
#[cfg(target_os = "linux")] | ||
use pprof::criterion::{Output, PProfProfiler}; | ||
use rand::Rng; | ||
use std::{sync::Arc, time::Duration}; | ||
use url::Url; | ||
|
||
use lance::dataset::{Dataset, WriteMode, WriteParams}; | ||
|
||
const BATCH_SIZE: u64 = 1024; | ||
|
||
fn gen_ranges(num_rows: u64, file_size: u64, n: usize) -> Vec<u64> { | ||
let mut rng = rand::thread_rng(); | ||
let mut ranges = Vec::with_capacity(n); | ||
for i in 0..n { | ||
ranges.push(rng.gen_range(1..num_rows)); | ||
ranges[i] = ((ranges[i] / file_size) << 32) | (ranges[i] % file_size); | ||
} | ||
|
||
ranges | ||
} | ||
|
||
fn bench_random_take(c: &mut Criterion) { | ||
// default tokio runtime | ||
let rt = tokio::runtime::Runtime::new().unwrap(); | ||
let num_batches = 1024; | ||
|
||
for file_size in [1024 * 1024, 1024] { | ||
let dataset = rt.block_on(create_dataset( | ||
"memory://test.lance", | ||
false, | ||
num_batches, | ||
file_size, | ||
)); | ||
let schema = Arc::new(dataset.schema().clone()); | ||
|
||
for num_rows in [1, 10, 100, 1000] { | ||
c.bench_function(&format!( | ||
"V1 Random Take ({file_size} file size, {num_batches} batches, {num_rows} rows per take)" | ||
), |b| { | ||
b.to_async(&rt).iter(|| async { | ||
let rows = gen_ranges(num_batches as u64 * BATCH_SIZE, file_size as u64, num_rows); | ||
let batch = dataset | ||
.take_rows(&rows, ProjectionRequest::Schema(schema.clone())) | ||
.await | ||
.expect(&format!("rows: {:?}", rows)); | ||
assert_eq!(batch.num_rows(), num_rows); | ||
}) | ||
}); | ||
} | ||
|
||
let dataset = rt.block_on(create_dataset( | ||
"memory://test.lance", | ||
true, | ||
num_batches, | ||
file_size, | ||
)); | ||
let schema = Arc::new(dataset.schema().clone()); | ||
for num_rows in [1, 10, 100, 1000] { | ||
c.bench_function(&format!( | ||
"V2 Random Take ({file_size} file size, {num_batches} batches, {num_rows} rows per take)" | ||
), |b| { | ||
b.to_async(&rt).iter(|| async { | ||
let batch = dataset | ||
.take_rows(&gen_ranges(num_batches as u64 * BATCH_SIZE, file_size as u64, num_rows), ProjectionRequest::Schema(schema.clone())) | ||
.await | ||
.unwrap(); | ||
assert_eq!(batch.num_rows(), num_rows); | ||
}) | ||
}); | ||
} | ||
} | ||
} | ||
|
||
async fn create_dataset(path: &str, use_v2: bool, num_batches: i32, file_size: i32) -> Dataset { | ||
let store = create_file( | ||
std::path::Path::new(path), | ||
WriteMode::Create, | ||
use_v2, | ||
num_batches, | ||
file_size, | ||
) | ||
.await; | ||
|
||
DatasetBuilder::from_uri(path) | ||
.with_object_store( | ||
store, | ||
Url::parse(path).unwrap(), | ||
Arc::new(RenameCommitHandler), | ||
) | ||
.load() | ||
.await | ||
.unwrap() | ||
} | ||
|
||
async fn create_file( | ||
path: &std::path::Path, | ||
mode: WriteMode, | ||
use_v2: bool, | ||
num_batches: i32, | ||
file_size: i32, | ||
) -> Arc<dyn ObjectStore> { | ||
let schema = Arc::new(ArrowSchema::new(vec![ | ||
Field::new("i", DataType::Int32, false), | ||
Field::new("f", DataType::Float32, false), | ||
Field::new("s", DataType::Binary, false), | ||
Field::new( | ||
"fsl", | ||
DataType::FixedSizeList( | ||
FieldRef::new(Field::new("item", DataType::Float32, true)), | ||
2, | ||
), | ||
false, | ||
), | ||
Field::new("blob", DataType::Binary, false), | ||
])); | ||
let batch_size = BATCH_SIZE as i32; | ||
let batches: Vec<RecordBatch> = (0..num_batches) | ||
.map(|i| { | ||
RecordBatch::try_new( | ||
schema.clone(), | ||
vec![ | ||
Arc::new(Int32Array::from_iter_values( | ||
i * batch_size..(i + 1) * batch_size, | ||
)), | ||
Arc::new(Float32Array::from_iter_values( | ||
(i * batch_size..(i + 1) * batch_size) | ||
.map(|x| x as f32) | ||
.collect::<Vec<_>>(), | ||
)), | ||
Arc::new(BinaryArray::from_iter_values( | ||
(i * batch_size..(i + 1) * batch_size) | ||
.map(|x| format!("blob-{}", x).into_bytes()), | ||
)), | ||
Arc::new( | ||
FixedSizeListArray::try_new_from_values( | ||
Float32Array::from_iter_values( | ||
(i * batch_size..(i + 2) * batch_size) | ||
.map(|x| (batch_size + (x - batch_size) / 2) as f32), | ||
), | ||
2, | ||
) | ||
.unwrap(), | ||
), | ||
Arc::new(BinaryArray::from_iter_values( | ||
(i * batch_size..(i + 1) * batch_size) | ||
.map(|x| format!("blob-{}", x).into_bytes()), | ||
)), | ||
], | ||
) | ||
.unwrap() | ||
}) | ||
.collect(); | ||
|
||
let test_uri = path.to_str().unwrap(); | ||
let write_params = WriteParams { | ||
max_rows_per_file: file_size as usize, | ||
max_rows_per_group: batch_size as usize, | ||
mode, | ||
use_legacy_format: !use_v2, | ||
..Default::default() | ||
}; | ||
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); | ||
let ds = Dataset::write(reader, test_uri, Some(write_params)) | ||
.await | ||
.unwrap(); | ||
ds.object_store.inner.clone() | ||
} | ||
|
||
#[cfg(target_os = "linux")] | ||
criterion_group!( | ||
name=benches; | ||
config = Criterion::default() | ||
.significance_level(0.01) | ||
.sample_size(10000) | ||
.warm_up_time(Duration::from_secs_f32(3.0)) | ||
.with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); | ||
targets = bench_random_take); | ||
#[cfg(not(target_os = "linux"))] | ||
criterion_group!( | ||
name=benches; | ||
config = Criterion::default().significance_level(0.1).sample_size(10); | ||
targets = bench_random_take); | ||
criterion_main!(benches); |
Oops, something went wrong.