Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async arrow parquet reader #1154

Merged
merged 7 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ description = "Apache Parquet implementation in Rust"
homepage = "https://github.com/apache/arrow-rs"
repository = "https://github.com/apache/arrow-rs"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
keywords = [ "arrow", "parquet", "hadoop" ]
keywords = ["arrow", "parquet", "hadoop"]
readme = "README.md"
build = "build.rs"
edition = "2021"
Expand All @@ -45,6 +45,8 @@ base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
rand = "0.8"
futures = { version = "0.3", optional = true }
tustvold marked this conversation as resolved.
Show resolved Hide resolved
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] }

[dev-dependencies]
criterion = "0.3"
Expand All @@ -63,16 +65,18 @@ cli = ["serde_json", "base64", "clap"]
test_common = []
# Experimental, unstable functionality primarily used for testing
experimental = []
# Experimental, unstable, async API
tustvold marked this conversation as resolved.
Show resolved Hide resolved
async = ["futures", "tokio"]

[[ bin ]]
[[bin]]
name = "parquet-read"
required-features = ["cli"]

[[ bin ]]
[[bin]]
name = "parquet-schema"
required-features = ["cli"]

[[ bin ]]
[[bin]]
name = "parquet-rowcount"
required-features = ["cli"]

Expand Down
60 changes: 38 additions & 22 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use arrow::datatypes::{
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type,
Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema,
Time32MillisecondType as ArrowTime32MillisecondType,
SchemaRef, Time32MillisecondType as ArrowTime32MillisecondType,
Time32SecondType as ArrowTime32SecondType,
Time64MicrosecondType as ArrowTime64MicrosecondType,
Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit,
Expand Down Expand Up @@ -91,7 +91,7 @@ pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
pub trait ArrayReader: Send {
fn as_any(&self) -> &dyn Any;

/// Returns the arrow type of this array reader.
Expand All @@ -117,6 +117,26 @@ pub trait ArrayReader {
fn get_rep_levels(&self) -> Option<&[i16]>;
}

/// A collection of row groups
pub trait RowGroupCollection {
/// Get schema of parquet file.
fn schema(&self) -> Result<SchemaDescPtr>;

/// Returns an iterator over the column chunks for particular column
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
}

impl RowGroupCollection for Arc<dyn FileReader> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does mean we have double dynamic dispatch, given these methods are called a couple of times per-file I'm inclined to consider this largely irrelevant

fn schema(&self) -> Result<SchemaDescPtr> {
Ok(self.metadata().file_metadata().schema_descr_ptr())
}

fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
Ok(Box::new(iterator))
}
}

/// Uses `record_reader` to read up to `batch_size` records from `pages`
///
/// Returns the number of records read, which can be less than batch_size if
Expand Down Expand Up @@ -478,7 +498,7 @@ where
impl<T, C> ArrayReader for ComplexObjectArrayReader<T, C>
where
T: DataType,
C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
C: Converter<Vec<Option<T::T>>, ArrayRef> + Send + 'static,
{
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -1311,9 +1331,9 @@ impl ArrayReader for StructArrayReader {
/// Create array reader from parquet schema, column indices, and parquet file reader.
pub fn build_array_reader<T>(
parquet_schema: SchemaDescPtr,
arrow_schema: Schema,
arrow_schema: SchemaRef,
column_indices: T,
file_reader: Arc<dyn FileReader>,
row_groups: Box<dyn RowGroupCollection>,
) -> Result<Box<dyn ArrayReader>>
where
T: IntoIterator<Item = usize>,
Expand Down Expand Up @@ -1351,13 +1371,8 @@ where
fields: filtered_root_fields,
};

ArrayReaderBuilder::new(
Arc::new(proj),
Arc::new(arrow_schema),
Arc::new(leaves),
file_reader,
)
.build_array_reader()
ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
.build_array_reader()
}

/// Used to build array reader.
Expand All @@ -1367,7 +1382,7 @@ struct ArrayReaderBuilder {
// Key: columns that need to be included in final array builder
// Value: column index in schema
columns_included: Arc<HashMap<*const Type, usize>>,
file_reader: Arc<dyn FileReader>,
row_groups: Box<dyn RowGroupCollection>,
}

/// Used in type visitor.
Expand Down Expand Up @@ -1667,13 +1682,13 @@ impl<'a> ArrayReaderBuilder {
root_schema: TypePtr,
arrow_schema: Arc<Schema>,
columns_included: Arc<HashMap<*const Type, usize>>,
file_reader: Arc<dyn FileReader>,
file_reader: Box<dyn RowGroupCollection>,
) -> Self {
Self {
root_schema,
arrow_schema,
columns_included,
file_reader,
row_groups: file_reader,
}
}

Expand Down Expand Up @@ -1707,10 +1722,10 @@ impl<'a> ArrayReaderBuilder {
context.rep_level,
context.path.clone(),
));
let page_iterator = Box::new(FilePageIterator::new(
self.columns_included[&(cur_type.as_ref() as *const Type)],
self.file_reader.clone(),
)?);

let page_iterator = self
.row_groups
.column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;

let arrow_type: Option<ArrowType> = self
.get_arrow_field(&cur_type, context)
Expand Down Expand Up @@ -2823,7 +2838,8 @@ mod tests {
#[test]
fn test_create_array_reader() {
let file = get_test_file("nulls.snappy.parquet");
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let file_reader: Arc<dyn FileReader> =
Arc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let arrow_schema = parquet_to_arrow_schema(
Expand All @@ -2834,9 +2850,9 @@ mod tests {

let array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
arrow_schema,
Arc::new(arrow_schema),
vec![0usize].into_iter(),
file_reader,
Box::new(file_reader),
)
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ impl ArrowReader for ParquetFileArrowReader {
.metadata()
.file_metadata()
.schema_descr_ptr(),
self.get_schema()?,
Arc::new(self.get_schema()?),
column_indices,
self.file_reader.clone(),
Box::new(self.file_reader.clone()),
)?;

ParquetRecordBatchReader::try_new(batch_size, array_reader)
Expand Down
Loading