diff --git a/examples/parquet_read_async.rs b/examples/parquet_read_async.rs index 86f064da20d..e9ac530bb89 100644 --- a/examples/parquet_read_async.rs +++ b/examples/parquet_read_async.rs @@ -39,7 +39,7 @@ async fn main() -> Result<()> { // A row group is consumed in two steps: the first step is to read the (compressed) // columns into memory, which is IO-bounded. let column_chunks = - read::read_columns_async(factory, row_group, schema.fields.clone(), None).await?; + read::read_columns_many_async(factory, row_group, schema.fields.clone(), None).await?; // the second step is to iterate over the columns in chunks. // this operation is CPU-bounded and should be sent to a separate thread pool (e.g. `tokio_rayon`) to not block diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index 474fbc89200..126253e6b2e 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::Schema; -use crate::io::parquet::read::read_columns; +use crate::io::parquet::read::read_columns_many; use crate::{ datatypes::Field, error::{ArrowError, Result}, @@ -233,7 +233,7 @@ impl RowGroupReader { } self.current_group += 1; - let column_chunks = read_columns( + let column_chunks = read_columns_many( &mut self.reader, row_group, self.schema.fields.clone(), diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 23ac8215a39..587544b6889 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -100,24 +100,35 @@ pub(super) fn get_field_columns<'a>( } /// Reads all columns that are part of the parquet field `field_name` -pub(super) fn _read_columns<'a, R: Read + Seek>( +/// # Implementation +/// This operation is IO-bounded `O(C)` where C is the number of columns associated to +/// the field (one for non-nested types) +pub fn read_columns<'a, R: Read + Seek>( reader: &mut R, columns: &'a [ColumnChunkMetaData], field_name: &str, ) -> Result)>> { get_field_columns(columns, field_name) .into_iter() - .map(|meta| { - let (start, len) = meta.byte_range(); - reader.seek(std::io::SeekFrom::Start(start))?; - let mut chunk = vec![0; len as usize]; - reader.read_exact(&mut chunk)?; - Ok((meta, chunk)) - }) + .map(|meta| _read_single_column(reader, meta)) .collect() } -async fn _read_single_column<'b, R, F>( +fn _read_single_column<'a, R>( + reader: &mut R, + meta: &'a ColumnChunkMetaData, +) -> Result<(&'a ColumnChunkMetaData, Vec)> +where + R: Read + Seek, +{ + let (start, len) = meta.byte_range(); + reader.seek(std::io::SeekFrom::Start(start))?; + let mut chunk = vec![0; len as usize]; + reader.read_exact(&mut chunk)?; + Ok((meta, chunk)) +} + +async fn _read_single_column_async<'b, R, F>( factory: F, meta: &ColumnChunkMetaData, ) -> Result<(&ColumnChunkMetaData, Vec)> @@ -134,7 +145,13 @@ where } /// Reads all columns that are part of the parquet field `field_name` -async fn _read_columns_async< +/// # Implementation +/// This operation is IO-bounded `O(C)` where C is the number of columns associated to +/// the field (one for non-nested types) +/// +/// It does so asynchronously via a single `join_all` over all the necessary columns for +/// `field_name`. +pub async fn read_columns_async< 'a, 'b, R: AsyncRead + AsyncSeek + Send + Unpin, @@ -146,54 +163,53 @@ async fn _read_columns_async< ) -> Result)>> { let futures = get_field_columns(columns, field_name) .into_iter() - .map(|meta| async { _read_single_column(factory.clone(), meta).await }); + .map(|meta| async { _read_single_column_async(factory.clone(), meta).await }); try_join_all(futures).await } -fn to_deserializers<'a>( - field_columns: Vec)>>, - fields: Vec, - row_group: &RowGroupMetaData, +/// Converts a vector of columns associated with the parquet field whose name is [`Field`] +/// to an iterator of [`Array`], [`ArrayIter`] of chunk size `chunk_size`. +pub fn to_deserializer<'a>( + columns: Vec<(&ColumnChunkMetaData, Vec)>, + field: Field, + num_rows: usize, chunk_size: Option, -) -> Result>> { - let chunk_size = chunk_size - .unwrap_or(usize::MAX) - .min(row_group.num_rows() as usize); +) -> Result> { + let chunk_size = chunk_size.unwrap_or(usize::MAX).min(num_rows); - field_columns + let (columns, types): (Vec<_>, Vec<_>) = columns .into_iter() - .map(|columns| { - let (columns, types): (Vec<_>, Vec<_>) = columns - .into_iter() - .map(|(column_meta, chunk)| { - let pages = PageIterator::new( - std::io::Cursor::new(chunk), - column_meta.num_values(), - column_meta.compression(), - column_meta.descriptor().clone(), - Arc::new(|_, _| true), - vec![], - ); - ( - BasicDecompressor::new(pages, vec![]), - column_meta.descriptor().type_(), - ) - }) - .unzip(); - (columns, types) + .map(|(column_meta, chunk)| { + let pages = PageIterator::new( + std::io::Cursor::new(chunk), + column_meta.num_values(), + column_meta.compression(), + column_meta.descriptor().clone(), + Arc::new(|_, _| true), + vec![], + ); + ( + BasicDecompressor::new(pages, vec![]), + column_meta.descriptor().type_(), + ) }) - .zip(fields.into_iter()) - .map(|((columns, types), field)| column_iter_to_arrays(columns, types, field, chunk_size)) - .collect() + .unzip(); + + column_iter_to_arrays(columns, types, field, chunk_size) } -/// Returns a vector of iterators of [`Array`] corresponding to the top level parquet fields whose -/// name matches `fields`'s names. +/// Returns a vector of iterators of [`Array`] ([`ArrayIter`]) corresponding to the top +/// level parquet fields whose name matches `fields`'s names. /// +/// # Implementation /// This operation is IO-bounded `O(C)` where C is the number of columns in the row group - /// it reads all the columns to memory from the row group associated to the requested fields. -pub fn read_columns<'a, R: Read + Seek>( +/// +/// This operation is single-threaded. For readers with stronger invariants +/// (e.g. implement [`Clone`]) you can use [`read_columns`] to read multiple columns at once +/// and convert them to [`ArrayIter`] via [`to_deserializer`]. +pub fn read_columns_many<'a, R: Read + Seek>( reader: &mut R, row_group: &RowGroupMetaData, fields: Vec, @@ -203,10 +219,16 @@ pub fn read_columns<'a, R: Read + Seek>( // This operation is IO-bounded `O(C)` where C is the number of columns in the row group let field_columns = fields .iter() - .map(|field| _read_columns(reader, row_group.columns(), &field.name)) + .map(|field| read_columns(reader, row_group.columns(), &field.name)) .collect::>>()?; - to_deserializers(field_columns, fields, row_group, chunk_size) + field_columns + .into_iter() + .zip(fields.into_iter()) + .map(|(columns, field)| { + to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size) + }) + .collect() } /// Returns a vector of iterators of [`Array`] corresponding to the top level parquet fields whose @@ -216,8 +238,10 @@ pub fn read_columns<'a, R: Read + Seek>( /// it reads all the columns to memory from the row group associated to the requested fields. /// /// # Implementation -/// This function concurrently reads all columns. -pub async fn read_columns_async< +/// This operation is IO-bounded `O(C)` where C is the number of columns in the row group - +/// it reads all the columns to memory from the row group associated to the requested fields. +/// It does so asynchronously via `join_all` +pub async fn read_columns_many_async< 'a, 'b, R: AsyncRead + AsyncSeek + Send + Unpin, @@ -230,9 +254,15 @@ pub async fn read_columns_async< ) -> Result>> { let futures = fields .iter() - .map(|field| _read_columns_async(factory.clone(), row_group.columns(), &field.name)); + .map(|field| read_columns_async(factory.clone(), row_group.columns(), &field.name)); let field_columns = try_join_all(futures).await?; - to_deserializers(field_columns, fields, row_group, chunk_size) + field_columns + .into_iter() + .zip(fields.into_iter()) + .map(|(columns, field)| { + to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size) + }) + .collect() }