Skip to content

Commit

Permalink
use parquet::reader::schema::parquet_to_arrow_schema
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent c56510e commit 6457c36
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 77 deletions.
104 changes: 31 additions & 73 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,20 @@ use std::string::String;
use std::sync::{Arc, Mutex};

use arrow::array::Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;

use parquet::basic;
use parquet::column::reader::*;
use parquet::data_type::ByteArray;
use parquet::file::reader::*;
use parquet::schema::types::Type;

use crate::datasource::{RecordBatchIterator, ScanResult, Table};
use crate::execution::error::{ExecutionError, Result};
use arrow::builder::BooleanBuilder;
use arrow::builder::Int64Builder;
use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder};
use parquet::data_type::Int96;
use parquet::reader::schema::parquet_to_arrow_schema;

pub struct ParquetTable {
filename: String,
Expand Down Expand Up @@ -87,44 +86,36 @@ impl ParquetFile {
let reader = SerializedFileReader::new(file)?;

let metadata = reader.metadata();
let file_type = to_arrow(metadata.file_metadata().schema())?;

match file_type.data_type() {
DataType::Struct(fields) => {
let schema = Schema::new(fields.clone());

let projection = match projection {
Some(p) => p,
None => {
let mut p = Vec::with_capacity(schema.fields().len());
for i in 0..schema.fields().len() {
p.push(i);
}
p
}
};

let projected_fields: Vec<Field> = projection
.iter()
.map(|i| schema.fields()[*i].clone())
.collect();

let projected_schema = Arc::new(Schema::new(projected_fields));

Ok(ParquetFile {
reader: reader,
row_group_index: 0,
schema: projected_schema,
projection,
batch_size: 64 * 1024,
current_row_group: None,
column_readers: vec![],
})
let schema =
parquet_to_arrow_schema(metadata.file_metadata().schema_descr_ptr())?;

let projection = match projection {
Some(p) => p,
None => {
let mut p = Vec::with_capacity(schema.fields().len());
for i in 0..schema.fields().len() {
p.push(i);
}
p
}
_ => Err(ExecutionError::General(
"Failed to read Parquet schema".to_string(),
)),
}
};

let projected_fields: Vec<Field> = projection
.iter()
.map(|i| schema.fields()[*i].clone())
.collect();

let projected_schema = Arc::new(Schema::new(projected_fields));

Ok(ParquetFile {
reader: reader,
row_group_index: 0,
schema: projected_schema,
projection,
batch_size: 64 * 1024,
current_row_group: None,
column_readers: vec![],
})
}

fn load_next_row_group(&mut self) -> Result<()> {
Expand Down Expand Up @@ -397,39 +388,6 @@ impl ParquetFile {
}
}

fn to_arrow(t: &Type) -> Result<Field> {
match t {
Type::PrimitiveType {
basic_info,
physical_type,
..
} => {
let arrow_type = match physical_type {
basic::Type::BOOLEAN => DataType::Boolean,
basic::Type::INT32 => DataType::Int32,
basic::Type::INT64 => DataType::Int64,
basic::Type::INT96 => DataType::Int64,
basic::Type::FLOAT => DataType::Float32,
basic::Type::DOUBLE => DataType::Float64,
basic::Type::BYTE_ARRAY => DataType::Utf8,
basic::Type::FIXED_LEN_BYTE_ARRAY => DataType::Utf8,
};

Ok(Field::new(basic_info.name(), arrow_type, false))
}
Type::GroupType { basic_info, fields } => Ok(Field::new(
basic_info.name(),
DataType::Struct(
fields
.iter()
.map(|f| to_arrow(f))
.collect::<Result<Vec<Field>>>()?,
),
false,
)),
}
}

impl RecordBatchIterator for ParquetFile {
fn schema(&self) -> &Arc<Schema> {
&self.schema
Expand Down
10 changes: 6 additions & 4 deletions rust/parquet/src/reader/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,12 @@ impl ParquetTypeConverter {
PhysicalType::BOOLEAN => Ok(DataType::Boolean),
PhysicalType::INT32 => self.to_int32(),
PhysicalType::INT64 => self.to_int64(),
PhysicalType::INT96 => self.to_int64(),
PhysicalType::FLOAT => Ok(DataType::Float32),
PhysicalType::DOUBLE => Ok(DataType::Float64),
PhysicalType::BYTE_ARRAY => self.to_byte_array(),
other => Err(ArrowError(format!(
"Unable to convert parquet type {}",
"Unable to convert parquet type d {}",
other
))),
}
Expand All @@ -197,7 +198,7 @@ impl ParquetTypeConverter {
LogicalType::INT_16 => Ok(DataType::Int16),
LogicalType::INT_32 => Ok(DataType::Int32),
other => Err(ArrowError(format!(
"Unable to convert parquet logical type {}",
"Unable to convert parquet logical type a {}",
other
))),
}
Expand All @@ -209,17 +210,18 @@ impl ParquetTypeConverter {
LogicalType::INT_64 => Ok(DataType::Int64),
LogicalType::UINT_64 => Ok(DataType::UInt64),
other => Err(ArrowError(format!(
"Unable to convert parquet logical type {}",
"Unable to convert parquet logical type b {}",
other
))),
}
}

fn to_byte_array(&self) -> Result<DataType> {
match self.schema.get_basic_info().logical_type() {
LogicalType::NONE => Ok(DataType::Utf8),
LogicalType::UTF8 => Ok(DataType::Utf8),
other => Err(ArrowError(format!(
"Unable to convert parquet logical type {}",
"Unable to convert parquet logical type c {}",
other
))),
}
Expand Down

0 comments on commit 6457c36

Please sign in to comment.