Skip to content

Commit

Permalink
convert to use row iter
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent f46e6f7 commit aea9f8a
Showing 1 changed file with 40 additions and 38 deletions.
78 changes: 40 additions & 38 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@ use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;

use parquet::basic;
use parquet::column::reader::*;
use parquet::data_type::ByteArray;
use parquet::file::reader::*;
use parquet::schema::types::Type;
use parquet::record::{Row, RowAccessor};
use parquet::schema::types::Type;

use crate::datasource::{RecordBatchIterator, Table};
use crate::execution::error::{ExecutionError, Result};
use arrow::builder::{BinaryBuilder, Float64Builder, Int32Builder};
use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder};

pub struct ParquetTable {
filename: String,
Expand Down Expand Up @@ -121,27 +119,7 @@ impl ParquetFile {

fn load_next_row_group(&mut self) {
if self.row_group_index < self.reader.num_row_groups() {
//println!("Loading row group {} of {}", self.row_group_index, self.reader.num_row_groups());
let reader = self.reader.get_row_group(self.row_group_index).unwrap();

// self.column_readers = vec![];
//
// match &self.projection {
// None => {
// for i in 0..reader.num_columns() {
// self.column_readers
// .push(reader.get_column_reader(i).unwrap());
// }
// }
// Some(proj) => {
// for i in proj {
// //TODO validate index in bounds
// self.column_readers
// .push(reader.get_column_reader(*i).unwrap());
// }
// }
// }

self.current_row_group = Some(reader);
self.row_group_index += 1;
} else {
Expand All @@ -152,10 +130,9 @@ impl ParquetFile {
fn load_batch(&mut self) -> Result<Option<RecordBatch>> {
match &self.current_row_group {
Some(reader) => {

// read batch of rows into memory

// let parquet_projection = self.projection.iter().map(|i| reader.metadata().schema_descr().column(*i)).collect();
// let parquet_projection = self.projection.iter().map(|i| reader.metadata().schema_descr().column(*i)).collect();

let mut row_iter = reader.get_row_iter(None).unwrap(); //TODO projection push down
let mut rows: Vec<Row> = Vec::with_capacity(self.batch_size);
Expand All @@ -168,7 +145,8 @@ impl ParquetFile {
println!("Loaded {} rows into memory", rows.len());

// convert to columnar batch
let mut batch: Vec<Arc<Array>> = Vec::with_capacity(self.projection.len());
let mut batch: Vec<Arc<Array>> =
Vec::with_capacity(self.projection.len());
for i in &self.projection {
let array: Arc<Array> = match self.schema.field(*i).data_type() {
DataType::Int32 => {
Expand All @@ -179,17 +157,44 @@ impl ParquetFile {
}
Arc::new(builder.finish())
}
DataType::Float32 => {
let mut builder = Float32Builder::new(rows.len());
for row in &rows {
//TODO null handling
builder.append_value(row.get_float(*i).unwrap()).unwrap();
}
Arc::new(builder.finish())
}
DataType::Float64 => {
let mut builder = Float64Builder::new(rows.len());
for row in &rows {
//TODO null handling
builder
.append_value(row.get_double(*i).unwrap())
.unwrap();
}
Arc::new(builder.finish())
}
DataType::Utf8 => {
let mut builder = BinaryBuilder::new(rows.len());
for row in &rows {
//TODO null handling
let bytes = row.get_bytes(*i).unwrap();
builder.append_string(&String::from_utf8(bytes.data().to_vec()).unwrap()).unwrap();
builder
.append_string(
&String::from_utf8(bytes.data().to_vec())
.unwrap(),
)
.unwrap();
}
Arc::new(builder.finish())
}
other => return Err(ExecutionError::NotImplemented(
format!("unsupported column reader type ({:?})", other)))
other => {
return Err(ExecutionError::NotImplemented(format!(
"unsupported column reader type ({:?})",
other
)));
}
};
batch.push(array);
}
Expand All @@ -200,9 +205,9 @@ impl ParquetFile {
Ok(None)
} else {
Ok(Some(RecordBatch::try_new(
self.schema.projection(&self.projection)?,
batch,
)?))
self.schema.projection(&self.projection)?,
batch,
)?))
}
}
_ => Ok(None),
Expand Down Expand Up @@ -299,14 +304,11 @@ mod tests {
.downcast_ref::<Int32Array>()
.unwrap();
let mut values: Vec<i32> = vec![];
for i in 0..16 {
for i in 0..batch.num_rows() {
values.push(array.value(i));
}

assert_eq!(
"[4, 5, 6, 7, 2, 3, 0, 1, 0, 0, 9, 0, 1, 0, 0, 0]",
format!("{:?}", values)
);
assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values));
}

#[test]
Expand Down

0 comments on commit aea9f8a

Please sign in to comment.