From 3a412b18a704d83e15bc8be6f114a4770e9d1bb4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 9 Mar 2019 11:00:06 -0700 Subject: [PATCH] first parquet test passes --- rust/datafusion/src/datasource/parquet.rs | 47 ++++++++++++++++++----- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index e2b50797fe55c..241ab7a3cf571 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -144,16 +144,19 @@ impl ParquetFile { let mut row_count = 0; for i in 0..self.column_readers.len() { let array: Arc = match self.column_readers[i] { - ColumnReader::BoolColumnReader(ref mut r) => { + ColumnReader::BoolColumnReader(ref mut _r) => { return Err(ExecutionError::NotImplemented( "unsupported column reader type (BOOL)".to_string(), )); } ColumnReader::Int32ColumnReader(ref mut r) => { - let mut builder = Int32Builder::new(self.batch_size); let mut read_buffer: Vec = Vec::with_capacity(self.batch_size); + for _ in 0..self.batch_size { + read_buffer.push(0); + } + match r.read_batch( self.batch_size, None, @@ -163,7 +166,7 @@ impl ParquetFile { //TODO this isn't handling null values Ok((count, _)) => { println!("Read {} rows", count); - + let mut builder = Int32Builder::new(self.batch_size); builder.append_slice(&read_buffer).unwrap(); row_count = count; Arc::new(builder.finish()) @@ -176,17 +179,17 @@ impl ParquetFile { } } } - ColumnReader::Int64ColumnReader(ref mut r) => { + ColumnReader::Int64ColumnReader(ref mut _r) => { return Err(ExecutionError::NotImplemented( "unsupported column reader type (INT64)".to_string(), )); } - ColumnReader::Int96ColumnReader(ref mut r) => { + ColumnReader::Int96ColumnReader(ref mut _r) => { return Err(ExecutionError::NotImplemented( "unsupported column reader type (INT96)".to_string(), )); } - ColumnReader::FloatColumnReader(ref mut r) => { + ColumnReader::FloatColumnReader(ref mut _r) => { return Err(ExecutionError::NotImplemented( "unsupported column reader type (FLOAT)".to_string(), )); @@ -215,7 +218,7 @@ impl ParquetFile { } } } - ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => { + ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => { return Err(ExecutionError::NotImplemented( "unsupported column reader type (FixedLenByteArray)" .to_string(), @@ -261,7 +264,15 @@ impl ParquetFile { if row_count == 0 { Ok(None) } else { - Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?)) + match &self.projection { + Some(proj) => Ok(Some(RecordBatch::try_new( + self.schema.projection(proj)?, + batch, + )?)), + None => { + Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?)) + } + } } } _ => Ok(None), @@ -337,10 +348,11 @@ impl RecordBatchIterator for ParquetFile { #[cfg(test)] mod tests { use super::*; + use arrow::array::Int32Array; use std::env; #[test] - fn read_parquet_file() { + fn read_read_i32_column() { let testdata = env::var("PARQUET_TEST_DATA").unwrap(); let filename = format!("{}/alltypes_plain.parquet", testdata); @@ -354,6 +366,21 @@ mod tests { let batch = it.next().unwrap().unwrap(); assert_eq!(1, batch.num_columns()); - assert_eq!(1, batch.num_rows()); + assert_eq!(64 * 1024, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..16 { + values.push(array.value(i)); + } + + assert_eq!( + "[4, 5, 6, 7, 2, 3, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]", + format!("{:?}", values) + ); } }