Skip to content

Commit

Permalink
perf: add random take benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
chebbyChefNEQ committed Jul 28, 2024
1 parent fc7d78c commit 91ec7e2
Show file tree
Hide file tree
Showing 4 changed files with 377 additions and 1 deletion.
174 changes: 174 additions & 0 deletions rust/lance-testing/src/object_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
use std::{
fmt::{Debug, Display},
ops::Range,
sync::Arc,
time::Duration,
};

use futures::stream::BoxStream;
use lance_io::object_store::WrappingObjectStore;
use object_store::{
path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
};
use rand::prelude::Distribution;

///! A trait that determines how long to sleep for a given path.
pub trait SleepDeterminer: Display + Debug + Clone + Send + Sync + 'static {
fn sleep(&self, path: &Path, range: Option<GetRange>) -> Duration;
}

#[derive(Debug, Clone)]
pub struct NoSleep;

impl Display for NoSleep {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "NoSleep")
}
}

impl SleepDeterminer for NoSleep {
fn sleep(&self, _path: &Path, _range: Option<GetRange>) -> Duration {
Duration::ZERO
}
}

///! A sleep determiner that sleeps for a random amount of time based on a gassuian distribution.
///! the arguments are the mean and standard deviation of the distribution, in seconds.
#[derive(Debug, Clone)]
pub struct GassuianSleepGenerator(pub f32, pub f32);

impl Display for GassuianSleepGenerator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "GassuianSleepGenerator({}, {})", self.0, self.1)
}
}

impl SleepDeterminer for GassuianSleepGenerator {
fn sleep(&self, _path: &Path, _range: Option<GetRange>) -> Duration {
let normal = rand::distributions::Standard;
let mut rng = rand::thread_rng();
let sleep_duration: f32 = normal.sample(&mut rng);
let sleep_duration = sleep_duration * self.1 + self.0;
if sleep_duration < 0.0 {
Duration::ZERO
} else {
Duration::from_secs_f32(sleep_duration)
}
}
}

///! A sleep determiner that sleeps if a coin flip is heads (can be unfair)
///! the argument is the probability of sleeping.
#[derive(Debug, Clone)]
pub struct MaybeSleepByChance<S: SleepDeterminer>(pub f32, pub S);

impl<S: SleepDeterminer> Display for MaybeSleepByChance<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MaybeSleepByChance({}, {})", self.0, self.1)
}
}

impl<S: SleepDeterminer> SleepDeterminer for MaybeSleepByChance<S> {
fn sleep(&self, _path: &Path, _range: Option<GetRange>) -> Duration {
let normal = rand::distributions::Uniform::new(0.0, 1.0);
let mut rng = rand::thread_rng();
let sleep: f32 = normal.sample(&mut rng);
if sleep < self.0 {
self.1.sleep(_path, _range)
} else {
Duration::ZERO
}
}
}

///! An object store that simulates slow operations.
///! This is useful for testing the behavior of the system under slow object store conditions.
///! The sleep duration is determined by the `sleep_hook` function.
///!
///! `sleep_hook` may also decide to not sleep at all by returning `Duration::ZERO`.
///! this is useful for ablation studies.
#[derive(Debug)]
pub struct SleepyObjectStore<S: SleepDeterminer> {
inner: Arc<dyn ObjectStore>,

sleep_hook: S,
}

impl<S: SleepDeterminer> Display for SleepyObjectStore<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"SleepyObjectStore(inner: {}, sleep_hook: {}",
self.inner, self.sleep_hook
)
}
}

impl<S: SleepDeterminer> SleepyObjectStore<S> {
pub fn new(inner: Arc<dyn ObjectStore>, sleep_hook: S) -> Self {
Self { inner, sleep_hook }
}

async fn do_sleep(&self, path: &Path, range: Option<GetRange>) {
let sleep_duration = self.sleep_hook.sleep(path, range);
if sleep_duration != Duration::ZERO {
tokio::time::sleep(sleep_duration).await;
}
}
}

#[async_trait::async_trait]
impl<S: SleepDeterminer> ObjectStore for SleepyObjectStore<S> {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
self.inner.put_opts(location, payload, opts).await
}

async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOpts,
) -> Result<Box<dyn MultipartUpload>> {
// just don't sleep here for now, we need a custom stream impl to do this
self.inner.put_multipart_opts(location, opts).await
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.do_sleep(location, options.range.clone()).await;
self.inner.get_opts(location, options).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.inner.delete(location).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
self.inner.list(prefix)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.inner.copy(from, to).await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.inner.copy_if_not_exists(from, to).await
}
}

#[derive(Debug)]
struct SleepyObjectStoreWrapper<S: SleepDeterminer>(pub S);

impl<S: SleepDeterminer> WrappingObjectStore for SleepyObjectStoreWrapper<S> {
fn wrap(&self, original: Arc<dyn ObjectStore>) -> Arc<dyn ObjectStore> {
Arc::new(SleepyObjectStore::new(original, self.0.clone()))
}
}
4 changes: 4 additions & 0 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,7 @@ harness = false
[[bench]]
name = "ivf_pq"
harness = false

[[bench]]
name = "take"
harness = false
198 changes: 198 additions & 0 deletions rust/lance/benches/take.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use arrow_array::{
BinaryArray, FixedSizeListArray, Float32Array, Int32Array, RecordBatch, RecordBatchIterator,
};
use arrow_schema::{DataType, Field, FieldRef, Schema as ArrowSchema};
use criterion::{criterion_group, criterion_main, Criterion};

use lance::{
arrow::FixedSizeListArrayExt,
dataset::{builder::DatasetBuilder, ProjectionRequest},
};
use lance_table::io::commit::RenameCommitHandler;
use object_store::ObjectStore;
#[cfg(target_os = "linux")]
use pprof::criterion::{Output, PProfProfiler};
use rand::Rng;
use std::{sync::Arc, time::Duration};
use url::Url;

use lance::dataset::{Dataset, WriteMode, WriteParams};

const BATCH_SIZE: u64 = 1024;

fn gen_ranges(num_rows: u64, file_size: u64, n: usize) -> Vec<u64> {
let mut rng = rand::thread_rng();
let mut ranges = Vec::with_capacity(n);
for i in 0..n {
ranges.push(rng.gen_range(1..num_rows));
ranges[i] = ((ranges[i] / file_size) << 32) | (ranges[i] % file_size);
}

ranges
}

fn bench_random_take(c: &mut Criterion) {
// default tokio runtime
let rt = tokio::runtime::Runtime::new().unwrap();
let num_batches = 1024;

for file_size in [1024 * 1024, 1024] {
let dataset = rt.block_on(create_dataset(
"memory://test.lance",
false,
num_batches,
file_size,
));
let schema = Arc::new(dataset.schema().clone());

for num_rows in [1, 10, 100, 1000] {
c.bench_function(&format!(
"V1 Random Take ({file_size} file size, {num_batches} batches, {num_rows} rows per take)"
), |b| {
b.to_async(&rt).iter(|| async {
let rows = gen_ranges(num_batches as u64 * BATCH_SIZE, file_size as u64, num_rows);
let batch = dataset
.take_rows(&rows, ProjectionRequest::Schema(schema.clone()))
.await
.expect(&format!("rows: {:?}", rows));
assert_eq!(batch.num_rows(), num_rows);
})
});
}

let dataset = rt.block_on(create_dataset(
"memory://test.lance",
true,
num_batches,
file_size,
));
let schema = Arc::new(dataset.schema().clone());
for num_rows in [1, 10, 100, 1000] {
c.bench_function(&format!(
"V2 Random Take ({file_size} file size, {num_batches} batches, {num_rows} rows per take)"
), |b| {
b.to_async(&rt).iter(|| async {
let batch = dataset
.take_rows(&gen_ranges(num_batches as u64 * BATCH_SIZE, file_size as u64, num_rows), ProjectionRequest::Schema(schema.clone()))
.await
.unwrap();
assert_eq!(batch.num_rows(), num_rows);
})
});
}
}
}

async fn create_dataset(path: &str, use_v2: bool, num_batches: i32, file_size: i32) -> Dataset {
let store = create_file(
std::path::Path::new(path),
WriteMode::Create,
use_v2,
num_batches,
file_size,
)
.await;

DatasetBuilder::from_uri(path)
.with_object_store(
store,
Url::parse(path).unwrap(),
Arc::new(RenameCommitHandler),
)
.load()
.await
.unwrap()
}

async fn create_file(
path: &std::path::Path,
mode: WriteMode,
use_v2: bool,
num_batches: i32,
file_size: i32,
) -> Arc<dyn ObjectStore> {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("i", DataType::Int32, false),
Field::new("f", DataType::Float32, false),
Field::new("s", DataType::Binary, false),
Field::new(
"fsl",
DataType::FixedSizeList(
FieldRef::new(Field::new("item", DataType::Float32, true)),
2,
),
false,
),
Field::new("blob", DataType::Binary, false),
]));
let batch_size = BATCH_SIZE as i32;
let batches: Vec<RecordBatch> = (0..num_batches)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(
i * batch_size..(i + 1) * batch_size,
)),
Arc::new(Float32Array::from_iter_values(
(i * batch_size..(i + 1) * batch_size)
.map(|x| x as f32)
.collect::<Vec<_>>(),
)),
Arc::new(BinaryArray::from_iter_values(
(i * batch_size..(i + 1) * batch_size)
.map(|x| format!("blob-{}", x).into_bytes()),
)),
Arc::new(
FixedSizeListArray::try_new_from_values(
Float32Array::from_iter_values(
(i * batch_size..(i + 2) * batch_size)
.map(|x| (batch_size + (x - batch_size) / 2) as f32),
),
2,
)
.unwrap(),
),
Arc::new(BinaryArray::from_iter_values(
(i * batch_size..(i + 1) * batch_size)
.map(|x| format!("blob-{}", x).into_bytes()),
)),
],
)
.unwrap()
})
.collect();

let test_uri = path.to_str().unwrap();
let write_params = WriteParams {
max_rows_per_file: file_size as usize,
max_rows_per_group: batch_size as usize,
mode,
use_legacy_format: !use_v2,
..Default::default()
};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let ds = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
ds.object_store.inner.clone()
}

#[cfg(target_os = "linux")]
criterion_group!(
name=benches;
config = Criterion::default()
.significance_level(0.01)
.sample_size(10000)
.warm_up_time(Duration::from_secs_f32(3.0))
.with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_random_take);
#[cfg(not(target_os = "linux"))]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10);
targets = bench_random_take);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub(crate) const DEFAULT_METADATA_CACHE_SIZE: usize = 256;
/// Lance Dataset
#[derive(Debug, Clone)]
pub struct Dataset {
pub(crate) object_store: Arc<ObjectStore>,
pub object_store: Arc<ObjectStore>,
pub(crate) commit_handler: Arc<dyn CommitHandler>,
/// Uri of the dataset.
///
Expand Down

0 comments on commit 91ec7e2

Please sign in to comment.