From ff3e5b72c704d1e1fd7d4c0e28be35e61476989c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 9 Mar 2019 10:40:11 -0700 Subject: [PATCH] test --- rust/datafusion/src/datasource/parquet.rs | 61 ++++++++++++++++++++--- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index 1f5a9d53d634e..e2b50797fe55c 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -144,10 +144,16 @@ impl ParquetFile { let mut row_count = 0; for i in 0..self.column_readers.len() { let array: Arc = match self.column_readers[i] { + ColumnReader::BoolColumnReader(ref mut r) => { + return Err(ExecutionError::NotImplemented( + "unsupported column reader type (BOOL)".to_string(), + )); + } ColumnReader::Int32ColumnReader(ref mut r) => { let mut builder = Int32Builder::new(self.batch_size); let mut read_buffer: Vec = Vec::with_capacity(self.batch_size); + match r.read_batch( self.batch_size, None, @@ -156,6 +162,8 @@ impl ParquetFile { ) { //TODO this isn't handling null values Ok((count, _)) => { + println!("Read {} rows", count); + builder.append_slice(&read_buffer).unwrap(); row_count = count; Arc::new(builder.finish()) @@ -168,6 +176,21 @@ impl ParquetFile { } } } + ColumnReader::Int64ColumnReader(ref mut r) => { + return Err(ExecutionError::NotImplemented( + "unsupported column reader type (INT64)".to_string(), + )); + } + ColumnReader::Int96ColumnReader(ref mut r) => { + return Err(ExecutionError::NotImplemented( + "unsupported column reader type (INT96)".to_string(), + )); + } + ColumnReader::FloatColumnReader(ref mut r) => { + return Err(ExecutionError::NotImplemented( + "unsupported column reader type (FLOAT)".to_string(), + )); + } ColumnReader::DoubleColumnReader(ref mut r) => { let mut builder = Float64Builder::new(self.batch_size); let mut read_buffer: Vec = @@ -192,6 +215,12 @@ impl ParquetFile { } } } + ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => { + return Err(ExecutionError::NotImplemented( + "unsupported column reader type (FixedLenByteArray)" + .to_string(), + )); + } ColumnReader::ByteArrayColumnReader(ref mut r) => { let mut b: Vec = Vec::with_capacity(self.batch_size); @@ -221,17 +250,13 @@ impl ParquetFile { } } } - _ => { - return Err(ExecutionError::NotImplemented( - "unsupported column reader type".to_string(), - )); - } }; + println!("Adding array to batch"); batch.push(array); } - // println!("Loaded batch of {} rows", row_count); + println!("Loaded batch of {} rows", row_count); if row_count == 0 { Ok(None) @@ -308,3 +333,27 @@ impl RecordBatchIterator for ParquetFile { } } } + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + + #[test] + fn read_parquet_file() { + let testdata = env::var("PARQUET_TEST_DATA").unwrap(); + let filename = format!("{}/alltypes_plain.parquet", testdata); + + let table = ParquetTable::new(&filename); + + println!("{:?}", table.schema()); + + let projection = Some(vec![0]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan.borrow_mut(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(1, batch.num_rows()); + } +}