Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent 322fc87 commit eaddafb
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 139 deletions.
8 changes: 8 additions & 0 deletions rust/arrow/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::mem::size_of;
use std::ops::{Add, Div, Mul, Sub};
use std::slice::from_raw_parts;
use std::str::FromStr;
use std::sync::Arc;

use packed_simd::*;
use serde_derive::{Deserialize, Serialize};
Expand Down Expand Up @@ -751,6 +752,13 @@ impl Schema {
"fields": self.fields.iter().map(|field| field.to_json()).collect::<Vec<Value>>(),
})
}

/// Create a new schema by applying a projection to this schema's fields
pub fn projection(&self, i: &Vec<usize>) -> Result<Arc<Schema>> {
//TODO bounds checks
let fields = i.iter().map(|index| self.field(*index).clone()).collect();
Ok(Arc::new(Schema::new(fields)))
}
}

impl fmt::Display for Schema {
Expand Down
319 changes: 180 additions & 139 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use parquet::column::reader::*;
use parquet::data_type::ByteArray;
use parquet::file::reader::*;
use parquet::schema::types::Type;
use parquet::record::{Row, RowAccessor};

use crate::datasource::{RecordBatchIterator, Table};
use crate::execution::error::{ExecutionError, Result};
Expand Down Expand Up @@ -73,11 +74,11 @@ impl Table for ParquetTable {
pub struct ParquetFile {
reader: SerializedFileReader<File>,
row_group_index: usize,
/// The schema of the underlying file
schema: Arc<Schema>,
projection: Option<Vec<usize>>,
projection: Vec<usize>,
batch_size: usize,
current_row_group: Option<Box<RowGroupReader>>,
column_readers: Vec<ColumnReader>,
}

impl ParquetFile {
Expand All @@ -91,14 +92,25 @@ impl ParquetFile {
DataType::Struct(fields) => {
let schema = Schema::new(fields.clone());
//println!("Parquet schema: {:?}", schema);

let projection = match projection {
Some(p) => p,
None => {
let mut p = Vec::with_capacity(schema.fields().len());
for i in 0..schema.fields().len() {
p.push(i);
}
p
}
};

Ok(ParquetFile {
reader: reader,
row_group_index: 0,
schema: Arc::new(schema),
projection,
batch_size: 64 * 1024,
current_row_group: None,
column_readers: vec![],
})
}
_ => Err(ExecutionError::General(
Expand All @@ -112,23 +124,23 @@ impl ParquetFile {
//println!("Loading row group {} of {}", self.row_group_index, self.reader.num_row_groups());
let reader = self.reader.get_row_group(self.row_group_index).unwrap();

self.column_readers = vec![];

match &self.projection {
None => {
for i in 0..reader.num_columns() {
self.column_readers
.push(reader.get_column_reader(i).unwrap());
}
}
Some(proj) => {
for i in proj {
//TODO validate index in bounds
self.column_readers
.push(reader.get_column_reader(*i).unwrap());
}
}
}
// self.column_readers = vec![];
//
// match &self.projection {
// None => {
// for i in 0..reader.num_columns() {
// self.column_readers
// .push(reader.get_column_reader(i).unwrap());
// }
// }
// Some(proj) => {
// for i in proj {
// //TODO validate index in bounds
// self.column_readers
// .push(reader.get_column_reader(*i).unwrap());
// }
// }
// }

self.current_row_group = Some(reader);
self.row_group_index += 1;
Expand All @@ -140,139 +152,168 @@ impl ParquetFile {
fn load_batch(&mut self) -> Result<Option<RecordBatch>> {
match &self.current_row_group {
Some(reader) => {
let mut batch: Vec<Arc<Array>> = Vec::with_capacity(reader.num_columns());
let mut row_count = 0;
for i in 0..self.column_readers.len() {
let array: Arc<Array> = match self.column_readers[i] {
ColumnReader::BoolColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (BOOL)".to_string(),
));
}
ColumnReader::Int32ColumnReader(ref mut r) => {
let mut read_buffer: Vec<i32> =
Vec::with_capacity(self.batch_size);

for _ in 0..self.batch_size {
read_buffer.push(0);
}
// read batch of rows into memory

match r.read_batch(
self.batch_size,
None,
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
println!("Read {} rows", count);
let mut builder = Int32Builder::new(self.batch_size);
builder.append_slice(&read_buffer).unwrap();
row_count = count;
Arc::new(builder.finish())
}
_ => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {})",
i
)));
}
}
}
ColumnReader::Int64ColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (INT64)".to_string(),
));
}
ColumnReader::Int96ColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (INT96)".to_string(),
));
}
ColumnReader::FloatColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (FLOAT)".to_string(),
));
}
ColumnReader::DoubleColumnReader(ref mut r) => {
let mut builder = Float64Builder::new(self.batch_size);
let mut read_buffer: Vec<f64> =
Vec::with_capacity(self.batch_size);
match r.read_batch(
self.batch_size,
None,
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
builder.append_slice(&read_buffer).unwrap();
row_count = count;
Arc::new(builder.finish())
}
_ => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {})",
i
)));
}
// let parquet_projection = self.projection.iter().map(|i| reader.metadata().schema_descr().column(*i)).collect();

let mut row_iter = reader.get_row_iter(None).unwrap(); //TODO projection push down
let mut rows: Vec<Row> = Vec::with_capacity(self.batch_size);
while let Some(row) = row_iter.next() {
if rows.len() == self.batch_size {
break;
}
rows.push(row);
}
println!("Loaded {} rows into memory", rows.len());

// convert to columnar batch
let mut batch: Vec<Arc<Array>> = Vec::with_capacity(self.projection.len());
for i in &self.projection {
let array: Arc<Array> = match self.schema.field(*i).data_type() {
DataType::Int32 => {
//TODO null handling
let mut builder = Int32Builder::new(rows.len());
for row in &rows {
builder.push()
}
Arc::new(builder.finish())
}
ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (FixedLenByteArray)"
.to_string(),
));
}
ColumnReader::ByteArrayColumnReader(ref mut r) => {
let mut b: Vec<ByteArray> =
Vec::with_capacity(self.batch_size);
for _ in 0..self.batch_size {
b.push(ByteArray::default());
}
match r.read_batch(self.batch_size, None, None, &mut b) {
//TODO this isn't handling null values
Ok((count, _)) => {
row_count = count;
//TODO this is horribly inefficient
let mut builder = BinaryBuilder::new(row_count);
for j in 0..row_count {
let foo = b[j].slice(0, b[j].len());
let bytes: &[u8] = foo.data();
let str =
String::from_utf8(bytes.to_vec()).unwrap();
builder.append_string(&str).unwrap();
}
Arc::new(builder.finish())
}
_ => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {})",
i
)));
}
DataType::Utf8 => {
//TODO null handling
let mut builder = BinaryBuilder::new(rows.len());
for row in &rows {
let bytes = row.get_bytes(*i).unwrap();
builder.append_string(&String::from_utf8(bytes.data().to_vec()).unwrap()).unwrap();
}
Arc::new(builder.finish())
}
_ => panic!()
};

println!("Adding array to batch");
// ColumnReader::BoolColumnReader(ref mut _r) => {
// return Err(ExecutionError::NotImplemented(
// "unsupported column reader type (BOOL)".to_string(),
// ));
// }
// ColumnReader::Int32ColumnReader(ref mut r) => {
// let mut read_buffer: Vec<i32> =
// Vec::with_capacity(self.batch_size);
//
// for _ in 0..self.batch_size {
// read_buffer.push(0);
// }
// r.read_
//
// match r.read_batch(
// self.batch_size,
// None,
// None,
// &mut read_buffer,
// ) {
// //TODO this isn't handling null values
// Ok((count, _)) => {
// println!("Read {} rows", count);
// let mut builder = Int32Builder::new(self.batch_size);
// builder.append_slice(&read_buffer).unwrap();
// row_count = count;
// Arc::new(builder.finish())
// }
// _ => {
// return Err(ExecutionError::NotImplemented(format!(
// "Error reading parquet batch (column {})",
// i
// )));
// }
// }
// }
// ColumnReader::Int64ColumnReader(ref mut _r) => {
// return Err(ExecutionError::NotImplemented(
// "unsupported column reader type (INT64)".to_string(),
// ));
// }
// ColumnReader::Int96ColumnReader(ref mut _r) => {
// return Err(ExecutionError::NotImplemented(
// "unsupported column reader type (INT96)".to_string(),
// ));
// }
// ColumnReader::FloatColumnReader(ref mut _r) => {
// return Err(ExecutionError::NotImplemented(
// "unsupported column reader type (FLOAT)".to_string(),
// ));
// }
// ColumnReader::DoubleColumnReader(ref mut r) => {
// let mut builder = Float64Builder::new(self.batch_size);
// let mut read_buffer: Vec<f64> =
// Vec::with_capacity(self.batch_size);
// match r.read_batch(
// self.batch_size,
// None,
// None,
// &mut read_buffer,
// ) {
// //TODO this isn't handling null values
// Ok((count, _)) => {
// builder.append_slice(&read_buffer).unwrap();
// row_count = count;
// Arc::new(builder.finish())
// }
// _ => {
// return Err(ExecutionError::NotImplemented(format!(
// "Error reading parquet batch (column {})",
// i
// )));
// }
// }
// }
// ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => {
// return Err(ExecutionError::NotImplemented(
// "unsupported column reader type (FixedLenByteArray)"
// .to_string(),
// ));
// }
// ColumnReader::ByteArrayColumnReader(ref mut r) => {
// let mut b: Vec<ByteArray> =
// Vec::with_capacity(self.batch_size);
// for _ in 0..self.batch_size {
// b.push(ByteArray::default());
// }
// match r.read_batch(self.batch_size, None, None, &mut b) {
// //TODO this isn't handling null values
// Ok((count, _)) => {
// row_count = count;
// //TODO this is horribly inefficient
// let mut builder = BinaryBuilder::new(row_count);
// for j in 0..row_count {
// let foo = b[j].slice(0, b[j].len());
// let bytes: &[u8] = foo.data();
// let str =
// String::from_utf8(bytes.to_vec()).unwrap();
// builder.append_string(&str).unwrap();
// }
// Arc::new(builder.finish())
// }
// _ => {
// return Err(ExecutionError::NotImplemented(format!(
// "Error reading parquet batch (column {})",
// i
// )));
// }
// }
// }
// };
//
batch.push(array);
}

println!("Loaded batch of {} rows", row_count);
println!("Loaded batch of {} rows", rows.len());

if row_count == 0 {
if rows.len() == 0 {
Ok(None)
} else {
match &self.projection {
Some(proj) => Ok(Some(RecordBatch::try_new(
self.schema.projection(proj)?,
Ok(Some(RecordBatch::try_new(
self.schema.projection(&self.projection)?,
batch,
)?)),
None => {
Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?))
}
}
)?))
}
}
_ => Ok(None),
Expand Down

0 comments on commit eaddafb

Please sign in to comment.