Skip to content

Commit

Permalink
replace read_column! macro with generic
Browse files Browse the repository at this point in the history
  • Loading branch information
nevi-me authored and andygrove committed Mar 14, 2019
1 parent 607a29f commit e6cbbaa
Showing 1 changed file with 77 additions and 41 deletions.
118 changes: 77 additions & 41 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use std::fs::File;
use std::string::String;
use std::sync::{Arc, Mutex};

use arrow::array::Array;
use arrow::datatypes::{DataType, Schema};
use arrow::array::{Array, PrimitiveArray};
use arrow::builder::PrimitiveBuilder;
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;

use parquet::column::reader::*;
Expand All @@ -32,9 +33,7 @@ use parquet::file::reader::*;
use crate::datasource::{RecordBatchIterator, ScanResult, Table};
use crate::execution::error::{ExecutionError, Result};
use arrow::array::BinaryArray;
use arrow::builder::BooleanBuilder;
use arrow::builder::Int64Builder;
use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder};
use arrow::builder::{BinaryBuilder, Int64Builder};
use parquet::data_type::Int96;
use parquet::reader::schema::parquet_to_arrow_schema;

Expand Down Expand Up @@ -91,12 +90,13 @@ fn create_binary_array(b: &Vec<ByteArray>, row_count: usize) -> Result<Arc<Binar
Ok(Arc::new(builder.finish()))
}

macro_rules! read_column {
($SELF:ident, $R:ident, $INDEX:expr, $BUILDER:ident, $TY:ident, $DEFAULT:expr) => {{
macro_rules! read_binary_column {
($SELF:ident, $R:ident, $INDEX:expr) => {{
//TODO: should be able to get num_rows in row group instead of defaulting to batch size
let mut read_buffer: Vec<$TY> = Vec::with_capacity($SELF.batch_size);
let mut read_buffer: Vec<ByteArray> =
Vec::with_capacity($SELF.batch_size);
for _ in 0..$SELF.batch_size {
read_buffer.push($DEFAULT);
read_buffer.push(ByteArray::default());
}
if $SELF.schema.field($INDEX).is_nullable() {

Expand All @@ -111,63 +111,83 @@ macro_rules! read_column {
None,
&mut read_buffer,
)?;
let mut builder = $BUILDER::new(levels_read);
if values_read == levels_read {
builder.append_slice(&read_buffer[0..values_read])?;
create_binary_array(&read_buffer, values_read)?
} else {
return Err(ExecutionError::NotImplemented("Parquet datasource does not support null values".to_string()))
}
Arc::new(builder.finish())
} else {
let (values_read, _) = $R.read_batch(
$SELF.batch_size,
None,
None,
&mut read_buffer,
)?;
let mut builder = $BUILDER::new(values_read);
builder.append_slice(&read_buffer[0..values_read])?;
Arc::new(builder.finish())
create_binary_array(&read_buffer, values_read)?
}
}}
}

macro_rules! read_binary_column {
($SELF:ident, $R:ident, $INDEX:expr) => {{
//TODO: should be able to get num_rows in row group instead of defaulting to batch size
let mut read_buffer: Vec<ByteArray> =
Vec::with_capacity($SELF.batch_size);
for _ in 0..$SELF.batch_size {
read_buffer.push(ByteArray::default());
trait ArrowReader<T> where T: ArrowPrimitiveType {
fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<T>>>;
}

impl<A,P> ArrowReader<A> for ColumnReaderImpl<P>
where
A: ArrowPrimitiveType,
P: parquet::data_type::DataType,
P::T: std::convert::From<A::Native>,
A::Native: std::convert::From<P::T>,
{
fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<A>>> {

// create read buffer
let mut read_buffer: Vec<P::T> = Vec::with_capacity(batch_size);

for _ in 0..batch_size {
read_buffer.push(A::default_value().into());
}
if $SELF.schema.field($INDEX).is_nullable() {

let mut def_levels: Vec<i16> = Vec::with_capacity($SELF.batch_size);
for _ in 0..$SELF.batch_size {
if is_nullable {
let mut def_levels: Vec<i16> = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
def_levels.push(0);
}

let (values_read, levels_read) = $R.read_batch(
$SELF.batch_size,
let (values_read, levels_read) = self.read_batch(
batch_size,
Some(&mut def_levels),
None,
&mut read_buffer,
)?;
let mut builder = PrimitiveBuilder::<A>::new(levels_read);
if values_read == levels_read {
create_binary_array(&read_buffer, values_read)?
let converted_buffer: Vec<A::Native> = read_buffer.into_iter().map(|v| v.into()).collect();
builder.append_slice(&converted_buffer[0..values_read])?;
} else {
return Err(ExecutionError::NotImplemented("Parquet datasource does not support null values".to_string()))
for (v, l) in read_buffer.into_iter().zip(def_levels) {
if l == 0 {
builder.append_value(v.into())?;
} else {
builder.append_null()?;
}
}
}
Ok(Arc::new(builder.finish()))
} else {
let (values_read, _) = $R.read_batch(
$SELF.batch_size,
None,
None,
&mut read_buffer,
let (values_read, _) = self.read_batch(
batch_size,
None,
None,
&mut read_buffer,
)?;
create_binary_array(&read_buffer, values_read)?

let mut builder = PrimitiveBuilder::<A>::new(values_read);
let converted_buffer: Vec<A::Native> = read_buffer.into_iter().map(|v| v.into()).collect();
builder.append_slice(&converted_buffer[0..values_read])?;
Ok(Arc::new(builder.finish()))
}
}}
}
}

impl ParquetFile {
Expand Down Expand Up @@ -243,15 +263,25 @@ impl ParquetFile {
Some(reader) => {
let mut batch: Vec<Arc<Array>> = Vec::with_capacity(reader.num_columns());
for i in 0..self.column_readers.len() {
let is_nullable = self.schema().field(i).is_nullable();
let array: Arc<Array> = match self.column_readers[i] {
ColumnReader::BoolColumnReader(ref mut r) => {
read_column!(self, r, i, BooleanBuilder, bool, false)
match ArrowReader::<BooleanType>::read(r, self.batch_size, is_nullable) {
Ok(array) => array,
Err(e) => return Err(e)
}
}
ColumnReader::Int32ColumnReader(ref mut r) => {
read_column!(self, r, i, Int32Builder, i32, 0)
match ArrowReader::<Int32Type>::read(r, self.batch_size, is_nullable) {
Ok(array) => array,
Err(e) => return Err(e)
}
}
ColumnReader::Int64ColumnReader(ref mut r) => {
read_column!(self, r, i, Int64Builder, i64, 0)
match ArrowReader::<Int64Type>::read(r, self.batch_size, is_nullable) {
Ok(array) => array,
Err(e) => return Err(e)
}
}
ColumnReader::Int96ColumnReader(ref mut r) => {
let mut read_buffer: Vec<Int96> =
Expand Down Expand Up @@ -314,10 +344,16 @@ impl ParquetFile {
}
}
ColumnReader::FloatColumnReader(ref mut r) => {
read_column!(self, r, i, Float32Builder, f32, 0_f32)
match ArrowReader::<Float32Type>::read(r, self.batch_size, is_nullable) {
Ok(array) => array,
Err(e) => return Err(e)
}
}
ColumnReader::DoubleColumnReader(ref mut r) => {
read_column!(self, r, i, Float64Builder, f64, 0_f64)
match ArrowReader::<Float64Type>::read(r, self.batch_size, is_nullable) {
Ok(array) => array,
Err(e) => return Err(e)
}
}
ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => {
read_binary_column!(self, r, i)
Expand Down

0 comments on commit e6cbbaa

Please sign in to comment.