diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index 6ab2a14822e9b..6546f187fb280 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -22,14 +22,12 @@ use std::string::String; use std::sync::{Arc, Mutex}; use arrow::array::Array; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; -use parquet::basic; use parquet::column::reader::*; use parquet::data_type::ByteArray; use parquet::file::reader::*; -use parquet::schema::types::Type; use crate::datasource::{RecordBatchIterator, ScanResult, Table}; use crate::execution::error::{ExecutionError, Result}; @@ -37,6 +35,7 @@ use arrow::builder::BooleanBuilder; use arrow::builder::Int64Builder; use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder}; use parquet::data_type::Int96; +use parquet::reader::schema::parquet_to_arrow_schema; pub struct ParquetTable { filename: String, @@ -87,44 +86,36 @@ impl ParquetFile { let reader = SerializedFileReader::new(file)?; let metadata = reader.metadata(); - let file_type = to_arrow(metadata.file_metadata().schema())?; - - match file_type.data_type() { - DataType::Struct(fields) => { - let schema = Schema::new(fields.clone()); - - let projection = match projection { - Some(p) => p, - None => { - let mut p = Vec::with_capacity(schema.fields().len()); - for i in 0..schema.fields().len() { - p.push(i); - } - p - } - }; - - let projected_fields: Vec = projection - .iter() - .map(|i| schema.fields()[*i].clone()) - .collect(); - - let projected_schema = Arc::new(Schema::new(projected_fields)); - - Ok(ParquetFile { - reader: reader, - row_group_index: 0, - schema: projected_schema, - projection, - batch_size: 64 * 1024, - current_row_group: None, - column_readers: vec![], - }) + let schema = + parquet_to_arrow_schema(metadata.file_metadata().schema_descr_ptr())?; + + let projection = match projection { + Some(p) => p, + None => { + let mut p = Vec::with_capacity(schema.fields().len()); + for i in 0..schema.fields().len() { + p.push(i); + } + p } - _ => Err(ExecutionError::General( - "Failed to read Parquet schema".to_string(), - )), - } + }; + + let projected_fields: Vec = projection + .iter() + .map(|i| schema.fields()[*i].clone()) + .collect(); + + let projected_schema = Arc::new(Schema::new(projected_fields)); + + Ok(ParquetFile { + reader: reader, + row_group_index: 0, + schema: projected_schema, + projection, + batch_size: 64 * 1024, + current_row_group: None, + column_readers: vec![], + }) } fn load_next_row_group(&mut self) -> Result<()> { @@ -397,39 +388,6 @@ impl ParquetFile { } } -fn to_arrow(t: &Type) -> Result { - match t { - Type::PrimitiveType { - basic_info, - physical_type, - .. - } => { - let arrow_type = match physical_type { - basic::Type::BOOLEAN => DataType::Boolean, - basic::Type::INT32 => DataType::Int32, - basic::Type::INT64 => DataType::Int64, - basic::Type::INT96 => DataType::Int64, - basic::Type::FLOAT => DataType::Float32, - basic::Type::DOUBLE => DataType::Float64, - basic::Type::BYTE_ARRAY => DataType::Utf8, - basic::Type::FIXED_LEN_BYTE_ARRAY => DataType::Utf8, - }; - - Ok(Field::new(basic_info.name(), arrow_type, false)) - } - Type::GroupType { basic_info, fields } => Ok(Field::new( - basic_info.name(), - DataType::Struct( - fields - .iter() - .map(|f| to_arrow(f)) - .collect::>>()?, - ), - false, - )), - } -} - impl RecordBatchIterator for ParquetFile { fn schema(&self) -> &Arc { &self.schema diff --git a/rust/parquet/src/reader/schema.rs b/rust/parquet/src/reader/schema.rs index 34276a2d5633f..bf8c43d6cfa05 100644 --- a/rust/parquet/src/reader/schema.rs +++ b/rust/parquet/src/reader/schema.rs @@ -177,11 +177,12 @@ impl ParquetTypeConverter { PhysicalType::BOOLEAN => Ok(DataType::Boolean), PhysicalType::INT32 => self.to_int32(), PhysicalType::INT64 => self.to_int64(), + PhysicalType::INT96 => self.to_int64(), PhysicalType::FLOAT => Ok(DataType::Float32), PhysicalType::DOUBLE => Ok(DataType::Float64), PhysicalType::BYTE_ARRAY => self.to_byte_array(), other => Err(ArrowError(format!( - "Unable to convert parquet type {}", + "Unable to convert parquet type d {}", other ))), } @@ -197,7 +198,7 @@ impl ParquetTypeConverter { LogicalType::INT_16 => Ok(DataType::Int16), LogicalType::INT_32 => Ok(DataType::Int32), other => Err(ArrowError(format!( - "Unable to convert parquet logical type {}", + "Unable to convert parquet logical type a {}", other ))), } @@ -209,7 +210,7 @@ impl ParquetTypeConverter { LogicalType::INT_64 => Ok(DataType::Int64), LogicalType::UINT_64 => Ok(DataType::UInt64), other => Err(ArrowError(format!( - "Unable to convert parquet logical type {}", + "Unable to convert parquet logical type b {}", other ))), } @@ -217,9 +218,10 @@ impl ParquetTypeConverter { fn to_byte_array(&self) -> Result { match self.schema.get_basic_info().logical_type() { + LogicalType::NONE => Ok(DataType::Utf8), LogicalType::UTF8 => Ok(DataType::Utf8), other => Err(ArrowError(format!( - "Unable to convert parquet logical type {}", + "Unable to convert parquet logical type c {}", other ))), }