Skip to content

Commit

Permalink
first parquet test passes
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent ff3e5b7 commit 3a412b1
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,19 @@ impl ParquetFile {
let mut row_count = 0;
for i in 0..self.column_readers.len() {
let array: Arc<Array> = match self.column_readers[i] {
ColumnReader::BoolColumnReader(ref mut r) => {
ColumnReader::BoolColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (BOOL)".to_string(),
));
}
ColumnReader::Int32ColumnReader(ref mut r) => {
let mut builder = Int32Builder::new(self.batch_size);
let mut read_buffer: Vec<i32> =
Vec::with_capacity(self.batch_size);

for _ in 0..self.batch_size {
read_buffer.push(0);
}

match r.read_batch(
self.batch_size,
None,
Expand All @@ -163,7 +166,7 @@ impl ParquetFile {
//TODO this isn't handling null values
Ok((count, _)) => {
println!("Read {} rows", count);

let mut builder = Int32Builder::new(self.batch_size);
builder.append_slice(&read_buffer).unwrap();
row_count = count;
Arc::new(builder.finish())
Expand All @@ -176,17 +179,17 @@ impl ParquetFile {
}
}
}
ColumnReader::Int64ColumnReader(ref mut r) => {
ColumnReader::Int64ColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (INT64)".to_string(),
));
}
ColumnReader::Int96ColumnReader(ref mut r) => {
ColumnReader::Int96ColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (INT96)".to_string(),
));
}
ColumnReader::FloatColumnReader(ref mut r) => {
ColumnReader::FloatColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (FLOAT)".to_string(),
));
Expand Down Expand Up @@ -215,7 +218,7 @@ impl ParquetFile {
}
}
}
ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => {
ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (FixedLenByteArray)"
.to_string(),
Expand Down Expand Up @@ -261,7 +264,15 @@ impl ParquetFile {
if row_count == 0 {
Ok(None)
} else {
Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?))
match &self.projection {
Some(proj) => Ok(Some(RecordBatch::try_new(
self.schema.projection(proj)?,
batch,
)?)),
None => {
Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?))
}
}
}
}
_ => Ok(None),
Expand Down Expand Up @@ -337,10 +348,11 @@ impl RecordBatchIterator for ParquetFile {
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::Int32Array;
use std::env;

#[test]
fn read_parquet_file() {
fn read_read_i32_column() {
let testdata = env::var("PARQUET_TEST_DATA").unwrap();
let filename = format!("{}/alltypes_plain.parquet", testdata);

Expand All @@ -354,6 +366,21 @@ mod tests {
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
assert_eq!(1, batch.num_rows());
assert_eq!(64 * 1024, batch.num_rows());

let array = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let mut values: Vec<i32> = vec![];
for i in 0..16 {
values.push(array.value(i));
}

assert_eq!(
"[4, 5, 6, 7, 2, 3, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]",
format!("{:?}", values)
);
}
}

0 comments on commit 3a412b1

Please sign in to comment.