Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved flexibility of reading parquet (#820)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Feb 6, 2022
1 parent 4fbbd90 commit 3d528c9
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 54 deletions.
2 changes: 1 addition & 1 deletion examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn main() -> Result<()> {
// A row group is consumed in two steps: the first step is to read the (compressed)
// columns into memory, which is IO-bounded.
let column_chunks =
read::read_columns_async(factory, row_group, schema.fields.clone(), None).await?;
read::read_columns_many_async(factory, row_group, schema.fields.clone(), None).await?;

// the second step is to iterate over the columns in chunks.
// this operation is CPU-bounded and should be sent to a separate thread pool (e.g. `tokio_rayon`) to not block
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
use crate::io::parquet::read::read_columns;
use crate::io::parquet::read::read_columns_many;
use crate::{
datatypes::Field,
error::{ArrowError, Result},
Expand Down Expand Up @@ -233,7 +233,7 @@ impl<R: Read + Seek> RowGroupReader<R> {
}
self.current_group += 1;

let column_chunks = read_columns(
let column_chunks = read_columns_many(
&mut self.reader,
row_group,
self.schema.fields.clone(),
Expand Down
132 changes: 81 additions & 51 deletions src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,35 @@ pub(super) fn get_field_columns<'a>(
}

/// Reads all columns that are part of the parquet field `field_name`
pub(super) fn _read_columns<'a, R: Read + Seek>(
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns associated to
/// the field (one for non-nested types)
pub fn read_columns<'a, R: Read + Seek>(
reader: &mut R,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Result<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>> {
get_field_columns(columns, field_name)
.into_iter()
.map(|meta| {
let (start, len) = meta.byte_range();
reader.seek(std::io::SeekFrom::Start(start))?;
let mut chunk = vec![0; len as usize];
reader.read_exact(&mut chunk)?;
Ok((meta, chunk))
})
.map(|meta| _read_single_column(reader, meta))
.collect()
}

async fn _read_single_column<'b, R, F>(
fn _read_single_column<'a, R>(
reader: &mut R,
meta: &'a ColumnChunkMetaData,
) -> Result<(&'a ColumnChunkMetaData, Vec<u8>)>
where
R: Read + Seek,
{
let (start, len) = meta.byte_range();
reader.seek(std::io::SeekFrom::Start(start))?;
let mut chunk = vec![0; len as usize];
reader.read_exact(&mut chunk)?;
Ok((meta, chunk))
}

async fn _read_single_column_async<'b, R, F>(
factory: F,
meta: &ColumnChunkMetaData,
) -> Result<(&ColumnChunkMetaData, Vec<u8>)>
Expand All @@ -134,7 +145,13 @@ where
}

/// Reads all columns that are part of the parquet field `field_name`
async fn _read_columns_async<
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns associated to
/// the field (one for non-nested types)
///
/// It does so asynchronously via a single `join_all` over all the necessary columns for
/// `field_name`.
pub async fn read_columns_async<
'a,
'b,
R: AsyncRead + AsyncSeek + Send + Unpin,
Expand All @@ -146,54 +163,53 @@ async fn _read_columns_async<
) -> Result<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>> {
let futures = get_field_columns(columns, field_name)
.into_iter()
.map(|meta| async { _read_single_column(factory.clone(), meta).await });
.map(|meta| async { _read_single_column_async(factory.clone(), meta).await });

try_join_all(futures).await
}

fn to_deserializers<'a>(
field_columns: Vec<Vec<(&ColumnChunkMetaData, Vec<u8>)>>,
fields: Vec<Field>,
row_group: &RowGroupMetaData,
/// Converts a vector of columns associated with the parquet field whose name is [`Field`]
/// to an iterator of [`Array`], [`ArrayIter`] of chunk size `chunk_size`.
pub fn to_deserializer<'a>(
columns: Vec<(&ColumnChunkMetaData, Vec<u8>)>,
field: Field,
num_rows: usize,
chunk_size: Option<usize>,
) -> Result<Vec<ArrayIter<'a>>> {
let chunk_size = chunk_size
.unwrap_or(usize::MAX)
.min(row_group.num_rows() as usize);
) -> Result<ArrayIter<'a>> {
let chunk_size = chunk_size.unwrap_or(usize::MAX).min(num_rows);

field_columns
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|columns| {
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|(column_meta, chunk)| {
let pages = PageIterator::new(
std::io::Cursor::new(chunk),
column_meta.num_values(),
column_meta.compression(),
column_meta.descriptor().clone(),
Arc::new(|_, _| true),
vec![],
);
(
BasicDecompressor::new(pages, vec![]),
column_meta.descriptor().type_(),
)
})
.unzip();
(columns, types)
.map(|(column_meta, chunk)| {
let pages = PageIterator::new(
std::io::Cursor::new(chunk),
column_meta.num_values(),
column_meta.compression(),
column_meta.descriptor().clone(),
Arc::new(|_, _| true),
vec![],
);
(
BasicDecompressor::new(pages, vec![]),
column_meta.descriptor().type_(),
)
})
.zip(fields.into_iter())
.map(|((columns, types), field)| column_iter_to_arrays(columns, types, field, chunk_size))
.collect()
.unzip();

column_iter_to_arrays(columns, types, field, chunk_size)
}

/// Returns a vector of iterators of [`Array`] corresponding to the top level parquet fields whose
/// name matches `fields`'s names.
/// Returns a vector of iterators of [`Array`] ([`ArrayIter`]) corresponding to the top
/// level parquet fields whose name matches `fields`'s names.
///
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns in the row group -
/// it reads all the columns to memory from the row group associated to the requested fields.
pub fn read_columns<'a, R: Read + Seek>(
///
/// This operation is single-threaded. For readers with stronger invariants
/// (e.g. implement [`Clone`]) you can use [`read_columns`] to read multiple columns at once
/// and convert them to [`ArrayIter`] via [`to_deserializer`].
pub fn read_columns_many<'a, R: Read + Seek>(
reader: &mut R,
row_group: &RowGroupMetaData,
fields: Vec<Field>,
Expand All @@ -203,10 +219,16 @@ pub fn read_columns<'a, R: Read + Seek>(
// This operation is IO-bounded `O(C)` where C is the number of columns in the row group
let field_columns = fields
.iter()
.map(|field| _read_columns(reader, row_group.columns(), &field.name))
.map(|field| read_columns(reader, row_group.columns(), &field.name))
.collect::<Result<Vec<_>>>()?;

to_deserializers(field_columns, fields, row_group, chunk_size)
field_columns
.into_iter()
.zip(fields.into_iter())
.map(|(columns, field)| {
to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size)
})
.collect()
}

/// Returns a vector of iterators of [`Array`] corresponding to the top level parquet fields whose
Expand All @@ -216,8 +238,10 @@ pub fn read_columns<'a, R: Read + Seek>(
/// it reads all the columns to memory from the row group associated to the requested fields.
///
/// # Implementation
/// This function concurrently reads all columns.
pub async fn read_columns_async<
/// This operation is IO-bounded `O(C)` where C is the number of columns in the row group -
/// it reads all the columns to memory from the row group associated to the requested fields.
/// It does so asynchronously via `join_all`
pub async fn read_columns_many_async<
'a,
'b,
R: AsyncRead + AsyncSeek + Send + Unpin,
Expand All @@ -230,9 +254,15 @@ pub async fn read_columns_async<
) -> Result<Vec<ArrayIter<'a>>> {
let futures = fields
.iter()
.map(|field| _read_columns_async(factory.clone(), row_group.columns(), &field.name));
.map(|field| read_columns_async(factory.clone(), row_group.columns(), &field.name));

let field_columns = try_join_all(futures).await?;

to_deserializers(field_columns, fields, row_group, chunk_size)
field_columns
.into_iter()
.zip(fields.into_iter())
.map(|(columns, field)| {
to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size)
})
.collect()
}

0 comments on commit 3d528c9

Please sign in to comment.