Skip to content

Commit

Permalink
try next instead of collect
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Nov 14, 2022
1 parent bdd9b7b commit 9e3e9ee
Showing 1 changed file with 5 additions and 7 deletions.
12 changes: 5 additions & 7 deletions benchmarks/src/parquet_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{io::Cursor, sync::Arc, time::Instant};

use common_types::schema::Schema;
use common_util::runtime::Runtime;
use futures::TryStreamExt;
use futures::StreamExt;
use log::info;
use object_store::{LocalFileSystem, ObjectStoreRef, Path};
use parquet::arrow::{
Expand All @@ -19,8 +19,6 @@ use crate::{config::SstBenchConfig, util};

pub struct ParquetBench {
store: ObjectStoreRef,
#[allow(dead_code)]
store_path: String,
pub sst_file_name: String,
max_projections: usize,
projection: Vec<usize>,
Expand Down Expand Up @@ -50,7 +48,6 @@ impl ParquetBench {

ParquetBench {
store,
store_path: config.store_path,
sst_file_name: config.sst_file_name,
max_projections: config.max_projections,
projection: Vec::new(),
Expand Down Expand Up @@ -97,6 +94,7 @@ impl ParquetBench {
let filter_begin_instant = Instant::now();
let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(bytes)
.unwrap()
.with_batch_size(self.batch_size)
.build()
.unwrap();
let filter_cost = filter_begin_instant.elapsed();
Expand Down Expand Up @@ -132,7 +130,7 @@ impl ParquetBench {
let open_cost = open_instant.elapsed();

let filter_begin_instant = Instant::now();
let stream = ParquetRecordBatchStreamBuilder::new(cursor)
let mut stream = ParquetRecordBatchStreamBuilder::new(cursor)
.await
.unwrap()
.with_batch_size(self.batch_size)
Expand All @@ -143,8 +141,8 @@ impl ParquetBench {
let mut total_rows = 0;
let mut batch_num = 0;
let iter_begin_instant = Instant::now();
for record_batch in stream.try_collect::<Vec<_>>().await.unwrap() {
let num_rows = record_batch.num_rows();
while let Some(record_batch) = stream.next().await {
let num_rows = record_batch.unwrap().num_rows();
total_rows += num_rows;
batch_num += 1;
}
Expand Down

0 comments on commit 9e3e9ee

Please sign in to comment.