Skip to content

Commit

Permalink
Merge 38e2225 into aac1844
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Jan 31, 2022
2 parents aac1844 + 38e2225 commit 80c1978
Show file tree
Hide file tree
Showing 18 changed files with 570 additions and 58 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ jobs:
cargo test
cd arrow
# re-run tests on arrow workspace with additional features
cargo test --features=prettyprint
cargo test --features=prettyprint --features=async
# run test on arrow with minimal set of features
cargo test --no-default-features
cargo run --example builders
Expand Down Expand Up @@ -238,7 +238,7 @@ jobs:
run: |
export CARGO_HOME="/github/home/.cargo"
export CARGO_TARGET_DIR="/github/home/target"
cargo clippy --features test_common --all-targets --workspace -- -D warnings -A clippy::redundant_field_names
cargo clippy --features test_common --features prettyprint --features=async --all-targets --workspace -- -D warnings -A clippy::redundant_field_names
check_benches:
name: Check Benchmarks (but don't run them)
Expand Down
10 changes: 5 additions & 5 deletions arrow/src/util/pretty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn create_table(results: &[RecordBatch]) -> Result<Table> {
let mut cells = Vec::new();
for col in 0..batch.num_columns() {
let column = batch.column(col);
cells.push(Cell::new(&array_value_to_string(&column, row)?));
cells.push(Cell::new(&array_value_to_string(column, row)?));
}
table.add_row(cells);
}
Expand All @@ -96,7 +96,7 @@ fn create_column(field: &str, columns: &[ArrayRef]) -> Result<Table> {

for col in columns {
for row in 0..col.len() {
let cells = vec![Cell::new(&array_value_to_string(&col, row)?)];
let cells = vec![Cell::new(&array_value_to_string(col, row)?)];
table.add_row(cells);
}
}
Expand Down Expand Up @@ -320,7 +320,7 @@ mod tests {
let mut builder = FixedSizeBinaryBuilder::new(3, 3);

builder.append_value(&[1, 2, 3]).unwrap();
builder.append_null();
builder.append_null().unwrap();
builder.append_value(&[7, 8, 9]).unwrap();

let array = Arc::new(builder.finish());
Expand Down Expand Up @@ -677,7 +677,7 @@ mod tests {
)?;

let mut buf = String::new();
write!(&mut buf, "{}", pretty_format_batches(&[batch])?.to_string()).unwrap();
write!(&mut buf, "{}", pretty_format_batches(&[batch])?).unwrap();

let s = vec![
"+---+-----+",
Expand All @@ -689,7 +689,7 @@ mod tests {
"| d | 100 |",
"+---+-----+",
];
let expected = String::from(s.join("\n"));
let expected = s.join("\n");
assert_eq!(expected, buf);

Ok(())
Expand Down
14 changes: 9 additions & 5 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ description = "Apache Parquet implementation in Rust"
homepage = "https://github.com/apache/arrow-rs"
repository = "https://github.com/apache/arrow-rs"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
keywords = [ "arrow", "parquet", "hadoop" ]
keywords = ["arrow", "parquet", "hadoop"]
readme = "README.md"
build = "build.rs"
edition = "2021"
Expand All @@ -45,6 +45,8 @@ base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
rand = "0.8"
futures = { version = "0.3", optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] }

[dev-dependencies]
criterion = "0.3"
Expand All @@ -55,24 +57,26 @@ brotli = "3.3"
flate2 = "1.0"
lz4 = "1.23"
serde_json = { version = "1.0", features = ["preserve_order"] }
arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils"] }
arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils", "prettyprint"] }

[features]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
cli = ["serde_json", "base64", "clap"]
test_common = []
# Experimental, unstable functionality primarily used for testing
experimental = []
# Enable async API
async = ["futures", "tokio"]

[[ bin ]]
[[bin]]
name = "parquet-read"
required-features = ["cli"]

[[ bin ]]
[[bin]]
name = "parquet-schema"
required-features = ["cli"]

[[ bin ]]
[[bin]]
name = "parquet-rowcount"
required-features = ["cli"]

Expand Down
60 changes: 38 additions & 22 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use arrow::datatypes::{
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type,
Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema,
Time32MillisecondType as ArrowTime32MillisecondType,
SchemaRef, Time32MillisecondType as ArrowTime32MillisecondType,
Time32SecondType as ArrowTime32SecondType,
Time64MicrosecondType as ArrowTime64MicrosecondType,
Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit,
Expand Down Expand Up @@ -91,7 +91,7 @@ pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
pub trait ArrayReader: Send {
fn as_any(&self) -> &dyn Any;

/// Returns the arrow type of this array reader.
Expand All @@ -117,6 +117,26 @@ pub trait ArrayReader {
fn get_rep_levels(&self) -> Option<&[i16]>;
}

/// A collection of row groups
pub trait RowGroupCollection {
/// Get schema of parquet file.
fn schema(&self) -> Result<SchemaDescPtr>;

/// Returns an iterator over the column chunks for particular column
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
}

impl RowGroupCollection for Arc<dyn FileReader> {
fn schema(&self) -> Result<SchemaDescPtr> {
Ok(self.metadata().file_metadata().schema_descr_ptr())
}

fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
Ok(Box::new(iterator))
}
}

/// Uses `record_reader` to read up to `batch_size` records from `pages`
///
/// Returns the number of records read, which can be less than batch_size if
Expand Down Expand Up @@ -482,7 +502,7 @@ where
impl<T, C> ArrayReader for ComplexObjectArrayReader<T, C>
where
T: DataType,
C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
C: Converter<Vec<Option<T::T>>, ArrayRef> + Send + 'static,
{
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -1315,9 +1335,9 @@ impl ArrayReader for StructArrayReader {
/// Create array reader from parquet schema, column indices, and parquet file reader.
pub fn build_array_reader<T>(
parquet_schema: SchemaDescPtr,
arrow_schema: Schema,
arrow_schema: SchemaRef,
column_indices: T,
file_reader: Arc<dyn FileReader>,
row_groups: Box<dyn RowGroupCollection>,
) -> Result<Box<dyn ArrayReader>>
where
T: IntoIterator<Item = usize>,
Expand Down Expand Up @@ -1355,13 +1375,8 @@ where
fields: filtered_root_fields,
};

ArrayReaderBuilder::new(
Arc::new(proj),
Arc::new(arrow_schema),
Arc::new(leaves),
file_reader,
)
.build_array_reader()
ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), row_groups)
.build_array_reader()
}

/// Used to build array reader.
Expand All @@ -1371,7 +1386,7 @@ struct ArrayReaderBuilder {
// Key: columns that need to be included in final array builder
// Value: column index in schema
columns_included: Arc<HashMap<*const Type, usize>>,
file_reader: Arc<dyn FileReader>,
row_groups: Box<dyn RowGroupCollection>,
}

/// Used in type visitor.
Expand Down Expand Up @@ -1671,13 +1686,13 @@ impl<'a> ArrayReaderBuilder {
root_schema: TypePtr,
arrow_schema: Arc<Schema>,
columns_included: Arc<HashMap<*const Type, usize>>,
file_reader: Arc<dyn FileReader>,
file_reader: Box<dyn RowGroupCollection>,
) -> Self {
Self {
root_schema,
arrow_schema,
columns_included,
file_reader,
row_groups: file_reader,
}
}

Expand Down Expand Up @@ -1711,10 +1726,10 @@ impl<'a> ArrayReaderBuilder {
context.rep_level,
context.path.clone(),
));
let page_iterator = Box::new(FilePageIterator::new(
self.columns_included[&(cur_type.as_ref() as *const Type)],
self.file_reader.clone(),
)?);

let page_iterator = self
.row_groups
.column_chunks(self.columns_included[&(cur_type.as_ref() as *const Type)])?;

let arrow_type: Option<ArrowType> = self
.get_arrow_field(&cur_type, context)
Expand Down Expand Up @@ -2827,7 +2842,8 @@ mod tests {
#[test]
fn test_create_array_reader() {
let file = get_test_file("nulls.snappy.parquet");
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let file_reader: Arc<dyn FileReader> =
Arc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let arrow_schema = parquet_to_arrow_schema(
Expand All @@ -2838,9 +2854,9 @@ mod tests {

let array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
arrow_schema,
Arc::new(arrow_schema),
vec![0usize].into_iter(),
file_reader,
Box::new(file_reader),
)
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ impl ArrowReader for ParquetFileArrowReader {
.metadata()
.file_metadata()
.schema_descr_ptr(),
self.get_schema()?,
Arc::new(self.get_schema()?),
column_indices,
self.file_reader.clone(),
Box::new(self.file_reader.clone()),
)?;

ParquetRecordBatchReader::try_new(batch_size, array_reader)
Expand Down
Loading

0 comments on commit 80c1978

Please sign in to comment.