From eaddafbf9d41437b065724bc4f4ef4e6427229cd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 9 Mar 2019 14:21:29 -0700 Subject: [PATCH] save --- rust/arrow/src/datatypes.rs | 8 + rust/datafusion/src/datasource/parquet.rs | 319 ++++++++++++---------- 2 files changed, 188 insertions(+), 139 deletions(-) diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index e0b6d706eab15..69ce7114790c8 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -26,6 +26,7 @@ use std::mem::size_of; use std::ops::{Add, Div, Mul, Sub}; use std::slice::from_raw_parts; use std::str::FromStr; +use std::sync::Arc; use packed_simd::*; use serde_derive::{Deserialize, Serialize}; @@ -751,6 +752,13 @@ impl Schema { "fields": self.fields.iter().map(|field| field.to_json()).collect::>(), }) } + + /// Create a new schema by applying a projection to this schema's fields + pub fn projection(&self, i: &Vec) -> Result> { + //TODO bounds checks + let fields = i.iter().map(|index| self.field(*index).clone()).collect(); + Ok(Arc::new(Schema::new(fields))) + } } impl fmt::Display for Schema { diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index 69ec57bef73f3..b933e24859911 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -32,6 +32,7 @@ use parquet::column::reader::*; use parquet::data_type::ByteArray; use parquet::file::reader::*; use parquet::schema::types::Type; +use parquet::record::{Row, RowAccessor}; use crate::datasource::{RecordBatchIterator, Table}; use crate::execution::error::{ExecutionError, Result}; @@ -73,11 +74,11 @@ impl Table for ParquetTable { pub struct ParquetFile { reader: SerializedFileReader, row_group_index: usize, + /// The schema of the underlying file schema: Arc, - projection: Option>, + projection: Vec, batch_size: usize, current_row_group: Option>, - column_readers: Vec, } impl ParquetFile { @@ -91,6 +92,18 @@ impl ParquetFile { DataType::Struct(fields) => { let schema = Schema::new(fields.clone()); //println!("Parquet schema: {:?}", schema); + + let projection = match projection { + Some(p) => p, + None => { + let mut p = Vec::with_capacity(schema.fields().len()); + for i in 0..schema.fields().len() { + p.push(i); + } + p + } + }; + Ok(ParquetFile { reader: reader, row_group_index: 0, @@ -98,7 +111,6 @@ impl ParquetFile { projection, batch_size: 64 * 1024, current_row_group: None, - column_readers: vec![], }) } _ => Err(ExecutionError::General( @@ -112,23 +124,23 @@ impl ParquetFile { //println!("Loading row group {} of {}", self.row_group_index, self.reader.num_row_groups()); let reader = self.reader.get_row_group(self.row_group_index).unwrap(); - self.column_readers = vec![]; - - match &self.projection { - None => { - for i in 0..reader.num_columns() { - self.column_readers - .push(reader.get_column_reader(i).unwrap()); - } - } - Some(proj) => { - for i in proj { - //TODO validate index in bounds - self.column_readers - .push(reader.get_column_reader(*i).unwrap()); - } - } - } +// self.column_readers = vec![]; +// +// match &self.projection { +// None => { +// for i in 0..reader.num_columns() { +// self.column_readers +// .push(reader.get_column_reader(i).unwrap()); +// } +// } +// Some(proj) => { +// for i in proj { +// //TODO validate index in bounds +// self.column_readers +// .push(reader.get_column_reader(*i).unwrap()); +// } +// } +// } self.current_row_group = Some(reader); self.row_group_index += 1; @@ -140,139 +152,168 @@ impl ParquetFile { fn load_batch(&mut self) -> Result> { match &self.current_row_group { Some(reader) => { - let mut batch: Vec> = Vec::with_capacity(reader.num_columns()); - let mut row_count = 0; - for i in 0..self.column_readers.len() { - let array: Arc = match self.column_readers[i] { - ColumnReader::BoolColumnReader(ref mut _r) => { - return Err(ExecutionError::NotImplemented( - "unsupported column reader type (BOOL)".to_string(), - )); - } - ColumnReader::Int32ColumnReader(ref mut r) => { - let mut read_buffer: Vec = - Vec::with_capacity(self.batch_size); - for _ in 0..self.batch_size { - read_buffer.push(0); - } + // read batch of rows into memory - match r.read_batch( - self.batch_size, - None, - None, - &mut read_buffer, - ) { - //TODO this isn't handling null values - Ok((count, _)) => { - println!("Read {} rows", count); - let mut builder = Int32Builder::new(self.batch_size); - builder.append_slice(&read_buffer).unwrap(); - row_count = count; - Arc::new(builder.finish()) - } - _ => { - return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {})", - i - ))); - } - } - } - ColumnReader::Int64ColumnReader(ref mut _r) => { - return Err(ExecutionError::NotImplemented( - "unsupported column reader type (INT64)".to_string(), - )); - } - ColumnReader::Int96ColumnReader(ref mut _r) => { - return Err(ExecutionError::NotImplemented( - "unsupported column reader type (INT96)".to_string(), - )); - } - ColumnReader::FloatColumnReader(ref mut _r) => { - return Err(ExecutionError::NotImplemented( - "unsupported column reader type (FLOAT)".to_string(), - )); - } - ColumnReader::DoubleColumnReader(ref mut r) => { - let mut builder = Float64Builder::new(self.batch_size); - let mut read_buffer: Vec = - Vec::with_capacity(self.batch_size); - match r.read_batch( - self.batch_size, - None, - None, - &mut read_buffer, - ) { - //TODO this isn't handling null values - Ok((count, _)) => { - builder.append_slice(&read_buffer).unwrap(); - row_count = count; - Arc::new(builder.finish()) - } - _ => { - return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {})", - i - ))); - } +// let parquet_projection = self.projection.iter().map(|i| reader.metadata().schema_descr().column(*i)).collect(); + + let mut row_iter = reader.get_row_iter(None).unwrap(); //TODO projection push down + let mut rows: Vec = Vec::with_capacity(self.batch_size); + while let Some(row) = row_iter.next() { + if rows.len() == self.batch_size { + break; + } + rows.push(row); + } + println!("Loaded {} rows into memory", rows.len()); + + // convert to columnar batch + let mut batch: Vec> = Vec::with_capacity(self.projection.len()); + for i in &self.projection { + let array: Arc = match self.schema.field(*i).data_type() { + DataType::Int32 => { + //TODO null handling + let mut builder = Int32Builder::new(rows.len()); + for row in &rows { + builder.push() } + Arc::new(builder.finish()) } - ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => { - return Err(ExecutionError::NotImplemented( - "unsupported column reader type (FixedLenByteArray)" - .to_string(), - )); - } - ColumnReader::ByteArrayColumnReader(ref mut r) => { - let mut b: Vec = - Vec::with_capacity(self.batch_size); - for _ in 0..self.batch_size { - b.push(ByteArray::default()); - } - match r.read_batch(self.batch_size, None, None, &mut b) { - //TODO this isn't handling null values - Ok((count, _)) => { - row_count = count; - //TODO this is horribly inefficient - let mut builder = BinaryBuilder::new(row_count); - for j in 0..row_count { - let foo = b[j].slice(0, b[j].len()); - let bytes: &[u8] = foo.data(); - let str = - String::from_utf8(bytes.to_vec()).unwrap(); - builder.append_string(&str).unwrap(); - } - Arc::new(builder.finish()) - } - _ => { - return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {})", - i - ))); - } + DataType::Utf8 => { + //TODO null handling + let mut builder = BinaryBuilder::new(rows.len()); + for row in &rows { + let bytes = row.get_bytes(*i).unwrap(); + builder.append_string(&String::from_utf8(bytes.data().to_vec()).unwrap()).unwrap(); } + Arc::new(builder.finish()) } + _ => panic!() }; - - println!("Adding array to batch"); +// ColumnReader::BoolColumnReader(ref mut _r) => { +// return Err(ExecutionError::NotImplemented( +// "unsupported column reader type (BOOL)".to_string(), +// )); +// } +// ColumnReader::Int32ColumnReader(ref mut r) => { +// let mut read_buffer: Vec = +// Vec::with_capacity(self.batch_size); +// +// for _ in 0..self.batch_size { +// read_buffer.push(0); +// } +// r.read_ +// +// match r.read_batch( +// self.batch_size, +// None, +// None, +// &mut read_buffer, +// ) { +// //TODO this isn't handling null values +// Ok((count, _)) => { +// println!("Read {} rows", count); +// let mut builder = Int32Builder::new(self.batch_size); +// builder.append_slice(&read_buffer).unwrap(); +// row_count = count; +// Arc::new(builder.finish()) +// } +// _ => { +// return Err(ExecutionError::NotImplemented(format!( +// "Error reading parquet batch (column {})", +// i +// ))); +// } +// } +// } +// ColumnReader::Int64ColumnReader(ref mut _r) => { +// return Err(ExecutionError::NotImplemented( +// "unsupported column reader type (INT64)".to_string(), +// )); +// } +// ColumnReader::Int96ColumnReader(ref mut _r) => { +// return Err(ExecutionError::NotImplemented( +// "unsupported column reader type (INT96)".to_string(), +// )); +// } +// ColumnReader::FloatColumnReader(ref mut _r) => { +// return Err(ExecutionError::NotImplemented( +// "unsupported column reader type (FLOAT)".to_string(), +// )); +// } +// ColumnReader::DoubleColumnReader(ref mut r) => { +// let mut builder = Float64Builder::new(self.batch_size); +// let mut read_buffer: Vec = +// Vec::with_capacity(self.batch_size); +// match r.read_batch( +// self.batch_size, +// None, +// None, +// &mut read_buffer, +// ) { +// //TODO this isn't handling null values +// Ok((count, _)) => { +// builder.append_slice(&read_buffer).unwrap(); +// row_count = count; +// Arc::new(builder.finish()) +// } +// _ => { +// return Err(ExecutionError::NotImplemented(format!( +// "Error reading parquet batch (column {})", +// i +// ))); +// } +// } +// } +// ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => { +// return Err(ExecutionError::NotImplemented( +// "unsupported column reader type (FixedLenByteArray)" +// .to_string(), +// )); +// } +// ColumnReader::ByteArrayColumnReader(ref mut r) => { +// let mut b: Vec = +// Vec::with_capacity(self.batch_size); +// for _ in 0..self.batch_size { +// b.push(ByteArray::default()); +// } +// match r.read_batch(self.batch_size, None, None, &mut b) { +// //TODO this isn't handling null values +// Ok((count, _)) => { +// row_count = count; +// //TODO this is horribly inefficient +// let mut builder = BinaryBuilder::new(row_count); +// for j in 0..row_count { +// let foo = b[j].slice(0, b[j].len()); +// let bytes: &[u8] = foo.data(); +// let str = +// String::from_utf8(bytes.to_vec()).unwrap(); +// builder.append_string(&str).unwrap(); +// } +// Arc::new(builder.finish()) +// } +// _ => { +// return Err(ExecutionError::NotImplemented(format!( +// "Error reading parquet batch (column {})", +// i +// ))); +// } +// } +// } +// }; +// batch.push(array); } - println!("Loaded batch of {} rows", row_count); + println!("Loaded batch of {} rows", rows.len()); - if row_count == 0 { + if rows.len() == 0 { Ok(None) } else { - match &self.projection { - Some(proj) => Ok(Some(RecordBatch::try_new( - self.schema.projection(proj)?, + Ok(Some(RecordBatch::try_new( + self.schema.projection(&self.projection)?, batch, - )?)), - None => { - Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?)) - } - } + )?)) } } _ => Ok(None),