Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent eaddafb commit f46e6f7
Showing 1 changed file with 7 additions and 118 deletions.
125 changes: 7 additions & 118 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,136 +172,25 @@ impl ParquetFile {
for i in &self.projection {
let array: Arc<Array> = match self.schema.field(*i).data_type() {
DataType::Int32 => {
//TODO null handling
let mut builder = Int32Builder::new(rows.len());
for row in &rows {
builder.push()
//TODO null handling
builder.append_value(row.get_int(*i).unwrap()).unwrap();
}
Arc::new(builder.finish())
}
DataType::Utf8 => {
//TODO null handling
let mut builder = BinaryBuilder::new(rows.len());
for row in &rows {
//TODO null handling
let bytes = row.get_bytes(*i).unwrap();
builder.append_string(&String::from_utf8(bytes.data().to_vec()).unwrap()).unwrap();
}
Arc::new(builder.finish())
}
_ => panic!()
other => return Err(ExecutionError::NotImplemented(
format!("unsupported column reader type ({:?})", other)))
};
// ColumnReader::BoolColumnReader(ref mut _r) => {
// return Err(ExecutionError::NotImplemented(
// "unsupported column reader type (BOOL)".to_string(),
// ));
// }
// ColumnReader::Int32ColumnReader(ref mut r) => {
// let mut read_buffer: Vec<i32> =
// Vec::with_capacity(self.batch_size);
//
// for _ in 0..self.batch_size {
// read_buffer.push(0);
// }
// r.read_
//
// match r.read_batch(
// self.batch_size,
// None,
// None,
// &mut read_buffer,
// ) {
// //TODO this isn't handling null values
// Ok((count, _)) => {
// println!("Read {} rows", count);
// let mut builder = Int32Builder::new(self.batch_size);
// builder.append_slice(&read_buffer).unwrap();
// row_count = count;
// Arc::new(builder.finish())
// }
// _ => {
// return Err(ExecutionError::NotImplemented(format!(
// "Error reading parquet batch (column {})",
// i
// )));
// }
// }
// }
// ColumnReader::Int64ColumnReader(ref mut _r) => {
// return Err(ExecutionError::NotImplemented(
// "unsupported column reader type (INT64)".to_string(),
// ));
// }
// ColumnReader::Int96ColumnReader(ref mut _r) => {
// return Err(ExecutionError::NotImplemented(
// "unsupported column reader type (INT96)".to_string(),
// ));
// }
// ColumnReader::FloatColumnReader(ref mut _r) => {
// return Err(ExecutionError::NotImplemented(
// "unsupported column reader type (FLOAT)".to_string(),
// ));
// }
// ColumnReader::DoubleColumnReader(ref mut r) => {
// let mut builder = Float64Builder::new(self.batch_size);
// let mut read_buffer: Vec<f64> =
// Vec::with_capacity(self.batch_size);
// match r.read_batch(
// self.batch_size,
// None,
// None,
// &mut read_buffer,
// ) {
// //TODO this isn't handling null values
// Ok((count, _)) => {
// builder.append_slice(&read_buffer).unwrap();
// row_count = count;
// Arc::new(builder.finish())
// }
// _ => {
// return Err(ExecutionError::NotImplemented(format!(
// "Error reading parquet batch (column {})",
// i
// )));
// }
// }
// }
// ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => {
// return Err(ExecutionError::NotImplemented(
// "unsupported column reader type (FixedLenByteArray)"
// .to_string(),
// ));
// }
// ColumnReader::ByteArrayColumnReader(ref mut r) => {
// let mut b: Vec<ByteArray> =
// Vec::with_capacity(self.batch_size);
// for _ in 0..self.batch_size {
// b.push(ByteArray::default());
// }
// match r.read_batch(self.batch_size, None, None, &mut b) {
// //TODO this isn't handling null values
// Ok((count, _)) => {
// row_count = count;
// //TODO this is horribly inefficient
// let mut builder = BinaryBuilder::new(row_count);
// for j in 0..row_count {
// let foo = b[j].slice(0, b[j].len());
// let bytes: &[u8] = foo.data();
// let str =
// String::from_utf8(bytes.to_vec()).unwrap();
// builder.append_string(&str).unwrap();
// }
// Arc::new(builder.finish())
// }
// _ => {
// return Err(ExecutionError::NotImplemented(format!(
// "Error reading parquet batch (column {})",
// i
// )));
// }
// }
// }
// };
//
batch.push(array);
}

Expand Down Expand Up @@ -402,7 +291,7 @@ mod tests {
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
assert_eq!(64 * 1024, batch.num_rows());
assert_eq!(8, batch.num_rows());

let array = batch
.column(0)
Expand All @@ -415,7 +304,7 @@ mod tests {
}

assert_eq!(
"[4, 5, 6, 7, 2, 3, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]",
"[4, 5, 6, 7, 2, 3, 0, 1, 0, 0, 9, 0, 1, 0, 0, 0]",
format!("{:?}", values)
);
}
Expand Down

0 comments on commit f46e6f7

Please sign in to comment.