From 9b5c5a4dcb82abfd02589d949e9bc9ad2dad747c Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Mon, 14 Nov 2022 20:16:51 +0800 Subject: [PATCH] feat: read all bytes out before do async bench (#390) --- benchmarks/src/bin/parquet-reader.rs | 3 ++- benchmarks/src/parquet_bench.rs | 33 +++++++++++++--------------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/benchmarks/src/bin/parquet-reader.rs b/benchmarks/src/bin/parquet-reader.rs index 62ea74ea07..3413a57943 100644 --- a/benchmarks/src/bin/parquet-reader.rs +++ b/benchmarks/src/bin/parquet-reader.rs @@ -6,12 +6,13 @@ use benchmarks::{ config::{self, BenchConfig}, parquet_bench::ParquetBench, }; +use env_logger::Env; static INIT_LOG: Once = Once::new(); pub fn init_bench() -> BenchConfig { INIT_LOG.call_once(|| { - env_logger::init(); + env_logger::from_env(Env::default().default_filter_or("info")).init(); }); config::bench_config_from_env() diff --git a/benchmarks/src/parquet_bench.rs b/benchmarks/src/parquet_bench.rs index 1ffcb26933..f15189fc16 100644 --- a/benchmarks/src/parquet_bench.rs +++ b/benchmarks/src/parquet_bench.rs @@ -2,11 +2,11 @@ //! Parquet bench. -use std::{sync::Arc, time::Instant}; +use std::{io::Cursor, sync::Arc, time::Instant}; use common_types::schema::Schema; use common_util::runtime::Runtime; -use futures::TryStreamExt; +use futures::StreamExt; use log::info; use object_store::{LocalFileSystem, ObjectStoreRef, Path}; use parquet::arrow::{ @@ -14,13 +14,11 @@ use parquet::arrow::{ }; use parquet_ext::{DataCacheRef, MetaCacheRef}; use table_engine::predicate::PredicateRef; -use tokio::fs::File; use crate::{config::SstBenchConfig, util}; pub struct ParquetBench { store: ObjectStoreRef, - store_path: String, pub sst_file_name: String, max_projections: usize, projection: Vec, @@ -50,7 +48,6 @@ impl ParquetBench { ParquetBench { store, - store_path: config.store_path, sst_file_name: config.sst_file_name, max_projections: config.max_projections, projection: Vec::new(), @@ -91,15 +88,15 @@ impl ParquetBench { self.runtime.block_on(async { let open_instant = Instant::now(); let get_result = self.store.get(&sst_path).await.unwrap(); - + let bytes = get_result.bytes().await.unwrap(); let open_cost = open_instant.elapsed(); let filter_begin_instant = Instant::now(); - let arrow_reader = - ParquetRecordBatchReaderBuilder::try_new(get_result.bytes().await.unwrap()) - .unwrap() - .build() - .unwrap(); + let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(bytes) + .unwrap() + .with_batch_size(self.batch_size) + .build() + .unwrap(); let filter_cost = filter_begin_instant.elapsed(); let iter_begin_instant = Instant::now(); @@ -124,16 +121,16 @@ impl ParquetBench { } pub fn run_async_bench(&self) { + let sst_path = Path::from(self.sst_file_name.clone()); self.runtime.block_on(async { let open_instant = Instant::now(); - let file = File::open(format!("{}/{}", self.store_path, self.sst_file_name)) - .await - .expect("failed to open file"); - + let get_result = self.store.get(&sst_path).await.unwrap(); + let bytes = get_result.bytes().await.unwrap(); + let cursor = Cursor::new(bytes); let open_cost = open_instant.elapsed(); let filter_begin_instant = Instant::now(); - let stream = ParquetRecordBatchStreamBuilder::new(file) + let mut stream = ParquetRecordBatchStreamBuilder::new(cursor) .await .unwrap() .with_batch_size(self.batch_size) @@ -144,8 +141,8 @@ impl ParquetBench { let mut total_rows = 0; let mut batch_num = 0; let iter_begin_instant = Instant::now(); - for record_batch in stream.try_collect::>().await.unwrap() { - let num_rows = record_batch.num_rows(); + while let Some(record_batch) = stream.next().await { + let num_rows = record_batch.unwrap().num_rows(); total_rows += num_rows; batch_num += 1; }