Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent 10710a2 commit ff3e5b7
Showing 1 changed file with 55 additions and 6 deletions.
61 changes: 55 additions & 6 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,16 @@ impl ParquetFile {
let mut row_count = 0;
for i in 0..self.column_readers.len() {
let array: Arc<Array> = match self.column_readers[i] {
ColumnReader::BoolColumnReader(ref mut r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (BOOL)".to_string(),
));
}
ColumnReader::Int32ColumnReader(ref mut r) => {
let mut builder = Int32Builder::new(self.batch_size);
let mut read_buffer: Vec<i32> =
Vec::with_capacity(self.batch_size);

match r.read_batch(
self.batch_size,
None,
Expand All @@ -156,6 +162,8 @@ impl ParquetFile {
) {
//TODO this isn't handling null values
Ok((count, _)) => {
println!("Read {} rows", count);

builder.append_slice(&read_buffer).unwrap();
row_count = count;
Arc::new(builder.finish())
Expand All @@ -168,6 +176,21 @@ impl ParquetFile {
}
}
}
ColumnReader::Int64ColumnReader(ref mut r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (INT64)".to_string(),
));
}
ColumnReader::Int96ColumnReader(ref mut r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (INT96)".to_string(),
));
}
ColumnReader::FloatColumnReader(ref mut r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (FLOAT)".to_string(),
));
}
ColumnReader::DoubleColumnReader(ref mut r) => {
let mut builder = Float64Builder::new(self.batch_size);
let mut read_buffer: Vec<f64> =
Expand All @@ -192,6 +215,12 @@ impl ParquetFile {
}
}
}
ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (FixedLenByteArray)"
.to_string(),
));
}
ColumnReader::ByteArrayColumnReader(ref mut r) => {
let mut b: Vec<ByteArray> =
Vec::with_capacity(self.batch_size);
Expand Down Expand Up @@ -221,17 +250,13 @@ impl ParquetFile {
}
}
}
_ => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type".to_string(),
));
}
};

println!("Adding array to batch");
batch.push(array);
}

// println!("Loaded batch of {} rows", row_count);
println!("Loaded batch of {} rows", row_count);

if row_count == 0 {
Ok(None)
Expand Down Expand Up @@ -308,3 +333,27 @@ impl RecordBatchIterator for ParquetFile {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::env;

#[test]
fn read_parquet_file() {
let testdata = env::var("PARQUET_TEST_DATA").unwrap();
let filename = format!("{}/alltypes_plain.parquet", testdata);

let table = ParquetTable::new(&filename);

println!("{:?}", table.schema());

let projection = Some(vec![0]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
assert_eq!(1, batch.num_rows());
}
}

0 comments on commit ff3e5b7

Please sign in to comment.