Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bench: Add parallel_read bench #100

Merged
merged 1 commit into from
Mar 4, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions benches/ops/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::SeekFrom;

use criterion::BenchmarkId;
use criterion::Criterion;
use futures::future::join_all;
use futures::io;
use futures::io::BufReader;
use futures::AsyncSeekExt;
use opendal::Operator;
use opendal_test::services::fs;
use opendal_test::services::s3;
Expand Down Expand Up @@ -71,13 +69,15 @@ pub fn bench(c: &mut Criterion) {
.iter(|| bench_buf_read(input.0.clone(), input.1))
},
);

group.throughput(criterion::Throughput::Bytes((TOTAL_SIZE / 2) as u64));
group.bench_with_input(
BenchmarkId::new("seek_read", &path),
BenchmarkId::new("range_read", &path),
&(op.clone(), &path),
|b, input| {
let pos = rng.gen_range(0..TOTAL_SIZE - BATCH_SIZE) as u64;
let pos = rng.gen_range(0..(TOTAL_SIZE / 2) as u64) as u64;
b.to_async(&runtime)
.iter(|| bench_seek_read(input.0.clone(), input.1, pos))
.iter(|| bench_range_read(input.0.clone(), input.1, pos))
},
);
group.throughput(criterion::Throughput::Bytes((TOTAL_SIZE / 2) as u64));
Expand All @@ -99,6 +99,40 @@ pub fn bench(c: &mut Criterion) {
.iter(|| bench_write(input.0.clone(), input.1, input.2.clone()))
},
);

// runtime
const RUNTIME_THREAD: usize = 4;
let mut builder = tokio::runtime::Builder::new_multi_thread();
builder.enable_all().worker_threads(RUNTIME_THREAD);

let runtime = builder.build().unwrap();
for parallel in [2, 4, 6, 8, 10, 12, 16] {
group.throughput(criterion::Throughput::Bytes(
parallel as u64 * TOTAL_SIZE as u64 / 2,
));
group.bench_with_input(
BenchmarkId::new(&format!("parallel_read_{}", parallel), &path),
&(op.clone(), &path, content.clone()),
|b, input| {
let pos = rng.gen_range(0..(TOTAL_SIZE / 2) as u64) as u64;
b.to_async(&runtime).iter(|| {
let futures = (0..parallel)
.map(|_| async {
bench_range_read(input.0.clone(), input.1, pos).await;
let mut d = 0;
// mock same little cpu work
for c in pos..pos + 100u64 {
d += c & 0x1f1f1f1f + c % 256;
}
let _ = d;
})
.collect::<Vec<_>>();
join_all(futures)
})
},
);
}

group.finish();
}
}
Expand All @@ -115,10 +149,8 @@ pub async fn bench_read(op: Operator, path: &str) {
io::copy(&mut r, &mut io::sink()).await.unwrap();
}

pub async fn bench_seek_read(op: Operator, path: &str, pos: u64) {
let mut r = op.object(path).limited_reader(TOTAL_SIZE as u64);
r.seek(SeekFrom::Start(pos)).await.expect("seek");
r.seek(SeekFrom::Start(0)).await.expect("seek");
pub async fn bench_range_read(op: Operator, path: &str, pos: u64) {
let mut r = op.object(path).range_reader(pos, (TOTAL_SIZE / 2) as u64);
io::copy(&mut r, &mut io::sink()).await.unwrap();
}

Expand Down