Skip to content

Commit

Permalink
feat: read all bytes out before do async bench (apache#390)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 authored Nov 14, 2022
1 parent 73ace16 commit 9b5c5a4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 19 deletions.
3 changes: 2 additions & 1 deletion benchmarks/src/bin/parquet-reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use benchmarks::{
config::{self, BenchConfig},
parquet_bench::ParquetBench,
};
use env_logger::Env;

static INIT_LOG: Once = Once::new();

pub fn init_bench() -> BenchConfig {
INIT_LOG.call_once(|| {
env_logger::init();
env_logger::from_env(Env::default().default_filter_or("info")).init();
});

config::bench_config_from_env()
Expand Down
33 changes: 15 additions & 18 deletions benchmarks/src/parquet_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,23 @@

//! Parquet bench.

use std::{sync::Arc, time::Instant};
use std::{io::Cursor, sync::Arc, time::Instant};

use common_types::schema::Schema;
use common_util::runtime::Runtime;
use futures::TryStreamExt;
use futures::StreamExt;
use log::info;
use object_store::{LocalFileSystem, ObjectStoreRef, Path};
use parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder, ParquetRecordBatchStreamBuilder,
};
use parquet_ext::{DataCacheRef, MetaCacheRef};
use table_engine::predicate::PredicateRef;
use tokio::fs::File;

use crate::{config::SstBenchConfig, util};

pub struct ParquetBench {
store: ObjectStoreRef,
store_path: String,
pub sst_file_name: String,
max_projections: usize,
projection: Vec<usize>,
Expand Down Expand Up @@ -50,7 +48,6 @@ impl ParquetBench {

ParquetBench {
store,
store_path: config.store_path,
sst_file_name: config.sst_file_name,
max_projections: config.max_projections,
projection: Vec::new(),
Expand Down Expand Up @@ -91,15 +88,15 @@ impl ParquetBench {
self.runtime.block_on(async {
let open_instant = Instant::now();
let get_result = self.store.get(&sst_path).await.unwrap();

let bytes = get_result.bytes().await.unwrap();
let open_cost = open_instant.elapsed();

let filter_begin_instant = Instant::now();
let arrow_reader =
ParquetRecordBatchReaderBuilder::try_new(get_result.bytes().await.unwrap())
.unwrap()
.build()
.unwrap();
let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(bytes)
.unwrap()
.with_batch_size(self.batch_size)
.build()
.unwrap();
let filter_cost = filter_begin_instant.elapsed();

let iter_begin_instant = Instant::now();
Expand All @@ -124,16 +121,16 @@ impl ParquetBench {
}

pub fn run_async_bench(&self) {
let sst_path = Path::from(self.sst_file_name.clone());
self.runtime.block_on(async {
let open_instant = Instant::now();
let file = File::open(format!("{}/{}", self.store_path, self.sst_file_name))
.await
.expect("failed to open file");

let get_result = self.store.get(&sst_path).await.unwrap();
let bytes = get_result.bytes().await.unwrap();
let cursor = Cursor::new(bytes);
let open_cost = open_instant.elapsed();

let filter_begin_instant = Instant::now();
let stream = ParquetRecordBatchStreamBuilder::new(file)
let mut stream = ParquetRecordBatchStreamBuilder::new(cursor)
.await
.unwrap()
.with_batch_size(self.batch_size)
Expand All @@ -144,8 +141,8 @@ impl ParquetBench {
let mut total_rows = 0;
let mut batch_num = 0;
let iter_begin_instant = Instant::now();
for record_batch in stream.try_collect::<Vec<_>>().await.unwrap() {
let num_rows = record_batch.num_rows();
while let Some(record_batch) = stream.next().await {
let num_rows = record_batch.unwrap().num_rows();
total_rows += num_rows;
batch_num += 1;
}
Expand Down

0 comments on commit 9b5c5a4

Please sign in to comment.