Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Improved flexibility of reading parquet #820

Merged
merged 2 commits into from
Feb 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn main() -> Result<()> {
// A row group is consumed in two steps: the first step is to read the (compressed)
// columns into memory, which is IO-bounded.
let column_chunks =
read::read_columns_async(factory, row_group, schema.fields.clone(), None).await?;
read::read_columns_many_async(factory, row_group, schema.fields.clone(), None).await?;

// the second step is to iterate over the columns in chunks.
// this operation is CPU-bounded and should be sent to a separate thread pool (e.g. `tokio_rayon`) to not block
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
use crate::io::parquet::read::read_columns;
use crate::io::parquet::read::read_columns_many;
use crate::{
datatypes::Field,
error::{ArrowError, Result},
Expand Down Expand Up @@ -233,7 +233,7 @@ impl<R: Read + Seek> RowGroupReader<R> {
}
self.current_group += 1;

let column_chunks = read_columns(
let column_chunks = read_columns_many(
&mut self.reader,
row_group,
self.schema.fields.clone(),
Expand Down
132 changes: 81 additions & 51 deletions src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,35 @@ pub(super) fn get_field_columns<'a>(
}

/// Reads all columns that are part of the parquet field `field_name`
pub(super) fn _read_columns<'a, R: Read + Seek>(
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns associated to
/// the field (one for non-nested types)
pub fn read_columns<'a, R: Read + Seek>(
reader: &mut R,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Result<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>> {
get_field_columns(columns, field_name)
.into_iter()
.map(|meta| {
let (start, len) = meta.byte_range();
reader.seek(std::io::SeekFrom::Start(start))?;
let mut chunk = vec![0; len as usize];
reader.read_exact(&mut chunk)?;
Ok((meta, chunk))
})
.map(|meta| _read_single_column(reader, meta))
.collect()
}

async fn _read_single_column<'b, R, F>(
fn _read_single_column<'a, R>(
reader: &mut R,
meta: &'a ColumnChunkMetaData,
) -> Result<(&'a ColumnChunkMetaData, Vec<u8>)>
where
R: Read + Seek,
{
let (start, len) = meta.byte_range();
reader.seek(std::io::SeekFrom::Start(start))?;
let mut chunk = vec![0; len as usize];
reader.read_exact(&mut chunk)?;
Ok((meta, chunk))
}

async fn _read_single_column_async<'b, R, F>(
factory: F,
meta: &ColumnChunkMetaData,
) -> Result<(&ColumnChunkMetaData, Vec<u8>)>
Expand All @@ -134,7 +145,13 @@ where
}

/// Reads all columns that are part of the parquet field `field_name`
async fn _read_columns_async<
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns associated to
/// the field (one for non-nested types)
///
/// It does so asynchronously via a single `join_all` over all the necessary columns for
/// `field_name`.
pub async fn read_columns_async<
'a,
'b,
R: AsyncRead + AsyncSeek + Send + Unpin,
Expand All @@ -146,54 +163,53 @@ async fn _read_columns_async<
) -> Result<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>> {
let futures = get_field_columns(columns, field_name)
.into_iter()
.map(|meta| async { _read_single_column(factory.clone(), meta).await });
.map(|meta| async { _read_single_column_async(factory.clone(), meta).await });

try_join_all(futures).await
}

fn to_deserializers<'a>(
field_columns: Vec<Vec<(&ColumnChunkMetaData, Vec<u8>)>>,
fields: Vec<Field>,
row_group: &RowGroupMetaData,
/// Converts a vector of columns associated with the parquet field whose name is [`Field`]
/// to an iterator of [`Array`], [`ArrayIter`] of chunk size `chunk_size`.
pub fn to_deserializer<'a>(
columns: Vec<(&ColumnChunkMetaData, Vec<u8>)>,
field: Field,
num_rows: usize,
chunk_size: Option<usize>,
) -> Result<Vec<ArrayIter<'a>>> {
let chunk_size = chunk_size
.unwrap_or(usize::MAX)
.min(row_group.num_rows() as usize);
) -> Result<ArrayIter<'a>> {
let chunk_size = chunk_size.unwrap_or(usize::MAX).min(num_rows);

field_columns
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|columns| {
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|(column_meta, chunk)| {
let pages = PageIterator::new(
std::io::Cursor::new(chunk),
column_meta.num_values(),
column_meta.compression(),
column_meta.descriptor().clone(),
Arc::new(|_, _| true),
vec![],
);
(
BasicDecompressor::new(pages, vec![]),
column_meta.descriptor().type_(),
)
})
.unzip();
(columns, types)
.map(|(column_meta, chunk)| {
let pages = PageIterator::new(
std::io::Cursor::new(chunk),
column_meta.num_values(),
column_meta.compression(),
column_meta.descriptor().clone(),
Arc::new(|_, _| true),
vec![],
);
(
BasicDecompressor::new(pages, vec![]),
column_meta.descriptor().type_(),
)
})
.zip(fields.into_iter())
.map(|((columns, types), field)| column_iter_to_arrays(columns, types, field, chunk_size))
.collect()
.unzip();

column_iter_to_arrays(columns, types, field, chunk_size)
}

/// Returns a vector of iterators of [`Array`] corresponding to the top level parquet fields whose
/// name matches `fields`'s names.
/// Returns a vector of iterators of [`Array`] ([`ArrayIter`]) corresponding to the top
/// level parquet fields whose name matches `fields`'s names.
///
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns in the row group -
/// it reads all the columns to memory from the row group associated to the requested fields.
pub fn read_columns<'a, R: Read + Seek>(
///
/// This operation is single-threaded. For readers with stronger invariants
/// (e.g. implement [`Clone`]) you can use [`read_columns`] to read multiple columns at once
/// and convert them to [`ArrayIter`] via [`to_deserializer`].
pub fn read_columns_many<'a, R: Read + Seek>(
reader: &mut R,
row_group: &RowGroupMetaData,
fields: Vec<Field>,
Expand All @@ -203,10 +219,16 @@ pub fn read_columns<'a, R: Read + Seek>(
// This operation is IO-bounded `O(C)` where C is the number of columns in the row group
let field_columns = fields
.iter()
.map(|field| _read_columns(reader, row_group.columns(), &field.name))
.map(|field| read_columns(reader, row_group.columns(), &field.name))
.collect::<Result<Vec<_>>>()?;

to_deserializers(field_columns, fields, row_group, chunk_size)
field_columns
.into_iter()
.zip(fields.into_iter())
.map(|(columns, field)| {
to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size)
})
.collect()
}

/// Returns a vector of iterators of [`Array`] corresponding to the top level parquet fields whose
Expand All @@ -216,8 +238,10 @@ pub fn read_columns<'a, R: Read + Seek>(
/// it reads all the columns to memory from the row group associated to the requested fields.
///
/// # Implementation
/// This function concurrently reads all columns.
pub async fn read_columns_async<
/// This operation is IO-bounded `O(C)` where C is the number of columns in the row group -
/// it reads all the columns to memory from the row group associated to the requested fields.
/// It does so asynchronously via `join_all`
pub async fn read_columns_many_async<
'a,
'b,
R: AsyncRead + AsyncSeek + Send + Unpin,
Expand All @@ -230,9 +254,15 @@ pub async fn read_columns_async<
) -> Result<Vec<ArrayIter<'a>>> {
let futures = fields
.iter()
.map(|field| _read_columns_async(factory.clone(), row_group.columns(), &field.name));
.map(|field| read_columns_async(factory.clone(), row_group.columns(), &field.name));

let field_columns = try_join_all(futures).await?;

to_deserializers(field_columns, fields, row_group, chunk_size)
field_columns
.into_iter()
.zip(fields.into_iter())
.map(|(columns, field)| {
to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size)
})
.collect()
}