Skip to content

Commit

Permalink
Add async arrow parquet reader (#1154)
Browse files Browse the repository at this point in the history
* Async parquet reader (#111)

Add Sync + Send bounds to parquet crate

* Remove Sync from DataType

* Review feedback

* Add basic test

* Fix lints

* Review feedback

* Tweak CI
  • Loading branch information
tustvold authored Feb 2, 2022
1 parent f80f1be commit 91d12ec
Show file tree
Hide file tree
Showing 17 changed files with 579 additions and 58 deletions.
25 changes: 17 additions & 8 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,31 @@ jobs:
run: |
export CARGO_HOME="/github/home/.cargo"
export CARGO_TARGET_DIR="/github/home/target"
# run tests on all workspace members with default feature list
cargo test
# Switch to arrow crate
cd arrow
# re-run tests on arrow workspace with additional features
# re-run tests on arrow crate with additional features
cargo test --features=prettyprint
# run test on arrow with minimal set of features
# run test on arrow crate with minimal set of features
cargo test --no-default-features
cargo run --example builders
cargo run --example dynamic_types
cargo run --example read_csv
cargo run --example read_csv_infer_schema
# Exit arrow directory
cd ..
(cd parquet && cargo check --no-default-features)
(cd arrow && cargo check --no-default-features)
(cd arrow-flight && cargo check --no-default-features)
cargo check --no-default-features
# Switch to parquet crate
cd ../parquet
# re-run tests on parquet crate with async feature enabled
cargo test --features=async
cargo check --no-default-features
# Switch to arrow-flight
cd ../arrow-flight
cargo check --no-default-features
# test the --features "simd" of the arrow crate. This requires nightly.
linux-test-simd:
Expand Down Expand Up @@ -237,7 +246,7 @@ jobs:
run: |
export CARGO_HOME="/github/home/.cargo"
export CARGO_TARGET_DIR="/github/home/target"
cargo clippy --features test_common --all-targets --workspace -- -D warnings -A clippy::redundant_field_names
cargo clippy --features test_common --features prettyprint --features=async --all-targets --workspace -- -D warnings -A clippy::redundant_field_names
check_benches:
name: Check Benchmarks (but don't run them)
Expand Down
12 changes: 8 additions & 4 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ description = "Apache Parquet implementation in Rust"
homepage = "https://github.com/apache/arrow-rs"
repository = "https://github.com/apache/arrow-rs"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
keywords = [ "arrow", "parquet", "hadoop" ]
keywords = ["arrow", "parquet", "hadoop"]
readme = "README.md"
build = "build.rs"
edition = "2021"
Expand All @@ -45,6 +45,8 @@ base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
rand = "0.8"
futures = { version = "0.3", optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] }

[dev-dependencies]
criterion = "0.3"
Expand All @@ -63,16 +65,18 @@ cli = ["serde_json", "base64", "clap"]
test_common = []
# Experimental, unstable functionality primarily used for testing
experimental = []
# Enable async API
async = ["futures", "tokio"]

[[ bin ]]
[[bin]]
name = "parquet-read"
required-features = ["cli"]

[[ bin ]]
[[bin]]
name = "parquet-schema"
required-features = ["cli"]

[[ bin ]]
[[bin]]
name = "parquet-rowcount"
required-features = ["cli"]

Expand Down
60 changes: 38 additions & 22 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use arrow::datatypes::{
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type,
Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema,
Time32MillisecondType as ArrowTime32MillisecondType,
SchemaRef, Time32MillisecondType as ArrowTime32MillisecondType,
Time32SecondType as ArrowTime32SecondType,
Time64MicrosecondType as ArrowTime64MicrosecondType,
Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit,
Expand Down Expand Up @@ -91,7 +91,7 @@ pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
pub trait ArrayReader: Send {
fn as_any(&self) -> &dyn Any;

/// Returns the arrow type of this array reader.
Expand All @@ -117,6 +117,26 @@ pub trait ArrayReader {
fn get_rep_levels(&self) -> Option<&[i16]>;
}

/// A collection of row groups
pub trait RowGroupCollection {
/// Get schema of parquet file.
fn schema(&self) -> Result<SchemaDescPtr>;

/// Returns an iterator over the column chunks for particular column
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
}

impl RowGroupCollection for Arc<dyn FileReader> {
fn schema(&self) -> Result<SchemaDescPtr> {
Ok(self.metadata().file_metadata().schema_descr_ptr())
}

fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
Ok(Box::new(iterator))
}
}

/// Uses `record_reader` to read up to `batch_size` records from `pages`
///
/// Returns the number of records read, which can be less than batch_size if
Expand Down Expand Up @@ -482,7 +502,7 @@ where
impl<T, C> ArrayReader for ComplexObjectArrayReader<T, C>
where
T: DataType,
C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
C: Converter<Vec<Option<T::T>>, ArrayRef> + Send + 'static,
{
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -1315,9 +1335,9 @@ impl ArrayReader for StructArrayReader {
/// Create array reader from parquet schema, column indices, and parquet file reader.
pub fn build_array_reader<T>(
parquet_schema: SchemaDescPtr,
arrow_schema: Schema,
arrow_schema: SchemaRef,
column_indices: T,
file_reader: Arc<dyn FileReader>,
row_groups: Box<dyn RowGroupCollection>,
) -> Result<Box<dyn ArrayReader>>
where
T: IntoIterator<Item = usize>,
Expand Down Expand Up @@ -1355,13 +1375,8 @@ where
fields: filtered_root_fields,
};

ArrayReaderBuilder::new(
Arc::new(proj),
Arc::new(arrow_schema),
Arc::new(leaves),
file_reader,
)
.build_array_reader()
ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
.build_array_reader()
}

/// Used to build array reader.
Expand All @@ -1371,7 +1386,7 @@ struct ArrayReaderBuilder {
// Key: columns that need to be included in final array builder
// Value: column index in schema
columns_included: Arc<HashMap<*const Type, usize>>,
file_reader: Arc<dyn FileReader>,
row_groups: Box<dyn RowGroupCollection>,
}

/// Used in type visitor.
Expand Down Expand Up @@ -1671,13 +1686,13 @@ impl<'a> ArrayReaderBuilder {
root_schema: TypePtr,
arrow_schema: Arc<Schema>,
columns_included: Arc<HashMap<*const Type, usize>>,
file_reader: Arc<dyn FileReader>,
file_reader: Box<dyn RowGroupCollection>,
) -> Self {
Self {
root_schema,
arrow_schema,
columns_included,
file_reader,
row_groups: file_reader,
}
}

Expand Down Expand Up @@ -1711,10 +1726,10 @@ impl<'a> ArrayReaderBuilder {
context.rep_level,
context.path.clone(),
));
let page_iterator = Box::new(FilePageIterator::new(
self.columns_included[&(cur_type.as_ref() as *const Type)],
self.file_reader.clone(),
)?);

let page_iterator = self
.row_groups
.column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;

let arrow_type: Option<ArrowType> = self
.get_arrow_field(&cur_type, context)
Expand Down Expand Up @@ -2827,7 +2842,8 @@ mod tests {
#[test]
fn test_create_array_reader() {
let file = get_test_file("nulls.snappy.parquet");
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let file_reader: Arc<dyn FileReader> =
Arc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let arrow_schema = parquet_to_arrow_schema(
Expand All @@ -2838,9 +2854,9 @@ mod tests {

let array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
arrow_schema,
Arc::new(arrow_schema),
vec![0usize].into_iter(),
file_reader,
Box::new(file_reader),
)
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ impl ArrowReader for ParquetFileArrowReader {
.metadata()
.file_metadata()
.schema_descr_ptr(),
self.get_schema()?,
Arc::new(self.get_schema()?),
column_indices,
self.file_reader.clone(),
Box::new(self.file_reader.clone()),
)?;

ParquetRecordBatchReader::try_new(batch_size, array_reader)
Expand Down
Loading

0 comments on commit 91d12ec

Please sign in to comment.