diff --git a/rust/datafusion/src/datasource/mod.rs b/rust/datafusion/src/datasource/mod.rs index 1a22a2370e69a..5688fb5fed85e 100644 --- a/rust/datafusion/src/datasource/mod.rs +++ b/rust/datafusion/src/datasource/mod.rs @@ -18,6 +18,7 @@ pub mod csv; pub mod datasource; pub mod memory; +pub mod parquet; pub use self::csv::{CsvBatchIterator, CsvFile}; pub use self::datasource::{RecordBatchIterator, ScanResult, Table}; diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs new file mode 100644 index 0000000000000..3fb4a3c07f460 --- /dev/null +++ b/rust/datafusion/src/datasource/parquet.rs @@ -0,0 +1,610 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Parquet Data source + +use std::fs::File; +use std::string::String; +use std::sync::{Arc, Mutex}; + +use arrow::array::{Array, PrimitiveArray}; +use arrow::builder::{BinaryBuilder, PrimitiveBuilder, TimestampNanosecondBuilder}; +use arrow::datatypes::*; +use arrow::record_batch::RecordBatch; + +use parquet::column::reader::*; +use parquet::data_type::{ByteArray, Int96}; +use parquet::file::reader::*; +use parquet::reader::schema::parquet_to_arrow_schema; + +use crate::datasource::{RecordBatchIterator, ScanResult, Table}; +use crate::execution::error::{ExecutionError, Result}; + +pub struct ParquetTable { + filename: String, + schema: Arc, +} + +impl ParquetTable { + pub fn try_new(filename: &str) -> Result { + let file = File::open(filename)?; + let parquet_file = ParquetFile::open(file, None, 0)?; + let schema = parquet_file.projection_schema.clone(); + Ok(Self { + filename: filename.to_string(), + schema, + }) + } +} + +impl Table for ParquetTable { + fn schema(&self) -> &Arc { + &self.schema + } + + fn scan( + &self, + projection: &Option>, + batch_size: usize, + ) -> Result> { + let file = File::open(self.filename.clone())?; + let parquet_file = ParquetFile::open(file, projection.clone(), batch_size)?; + Ok(vec![Arc::new(Mutex::new(parquet_file))]) + } +} + +pub struct ParquetFile { + reader: SerializedFileReader, + /// Projection expressed as column indices into underlying parquet reader + projection: Vec, + /// The schema of the projection + projection_schema: Arc, + batch_size: usize, + row_group_index: usize, + current_row_group: Option>, + column_readers: Vec, +} + +macro_rules! read_binary_column { + ($SELF:ident, $R:ident, $INDEX:expr) => {{ + let mut read_buffer: Vec = + vec![ByteArray::default(); $SELF.batch_size]; + let mut def_levels: Vec = vec![0; $SELF.batch_size]; + let (_, levels_read) = $R.read_batch( + $SELF.batch_size, + Some(&mut def_levels), + None, + &mut read_buffer, + )?; + let mut builder = BinaryBuilder::new(levels_read); + let mut value_index = 0; + for i in 0..levels_read { + if def_levels[i] > 0 { + builder.append_string( + &String::from_utf8(read_buffer[value_index].data().to_vec()).unwrap(), + )?; + value_index += 1; + } else { + builder.append_null()?; + } + } + Arc::new(builder.finish()) + }}; +} + +trait ArrowReader +where + T: ArrowPrimitiveType, +{ + fn read( + &mut self, + batch_size: usize, + is_nullable: bool, + ) -> Result>>; +} + +impl ArrowReader for ColumnReaderImpl

+where + A: ArrowPrimitiveType, + P: parquet::data_type::DataType, + P::T: std::convert::From, + A::Native: std::convert::From, +{ + fn read( + &mut self, + batch_size: usize, + is_nullable: bool, + ) -> Result>> { + // create read buffer + let mut read_buffer: Vec = vec![A::default_value().into(); batch_size]; + + if is_nullable { + let mut def_levels: Vec = vec![0; batch_size]; + + let (values_read, levels_read) = self.read_batch( + batch_size, + Some(&mut def_levels), + None, + &mut read_buffer, + )?; + let mut builder = PrimitiveBuilder::::new(levels_read); + let converted_buffer: Vec = + read_buffer.into_iter().map(|v| v.into()).collect(); + if values_read == levels_read { + builder.append_slice(&converted_buffer[0..values_read])?; + } else { + let mut value_index = 0; + for i in 0..def_levels.len() { + if def_levels[i] != 0 { + builder.append_value(converted_buffer[value_index].into())?; + value_index += 1; + } else { + builder.append_null()?; + } + } + } + Ok(Arc::new(builder.finish())) + } else { + let (values_read, _) = + self.read_batch(batch_size, None, None, &mut read_buffer)?; + + let mut builder = PrimitiveBuilder::::new(values_read); + let converted_buffer: Vec = + read_buffer.into_iter().map(|v| v.into()).collect(); + builder.append_slice(&converted_buffer[0..values_read])?; + Ok(Arc::new(builder.finish())) + } + } +} + +impl ParquetFile { + pub fn open( + file: File, + projection: Option>, + batch_size: usize, + ) -> Result { + let reader = SerializedFileReader::new(file)?; + + let metadata = reader.metadata(); + let schema = + parquet_to_arrow_schema(metadata.file_metadata().schema_descr_ptr())?; + + // even if we aren't referencing structs or lists in our projection, column reader + // indexes will be off until we have support for nested schemas + for i in 0..schema.fields().len() { + match schema.field(i).data_type() { + DataType::List(_) => { + return Err(ExecutionError::NotImplemented( + "Parquet datasource does not support LIST".to_string(), + )); + } + DataType::Struct(_) => { + return Err(ExecutionError::NotImplemented( + "Parquet datasource does not support STRUCT".to_string(), + )); + } + _ => {} + } + } + + let projection = match projection { + Some(p) => p, + None => { + let mut p = Vec::with_capacity(schema.fields().len()); + for i in 0..schema.fields().len() { + p.push(i); + } + p + } + }; + + let projected_schema = schema_projection(&schema, &projection)?; + + Ok(ParquetFile { + reader: reader, + row_group_index: 0, + projection_schema: projected_schema, + projection, + batch_size, + current_row_group: None, + column_readers: vec![], + }) + } + + fn load_next_row_group(&mut self) -> Result<()> { + if self.row_group_index < self.reader.num_row_groups() { + let reader = self.reader.get_row_group(self.row_group_index)?; + + self.column_readers.clear(); + self.column_readers = Vec::with_capacity(self.projection.len()); + + for i in 0..self.projection.len() { + self.column_readers + .push(reader.get_column_reader(self.projection[i])?); + } + + self.current_row_group = Some(reader); + self.row_group_index += 1; + + Ok(()) + } else { + Err(ExecutionError::General( + "Attempt to read past final row group".to_string(), + )) + } + } + + fn load_batch(&mut self) -> Result> { + match &self.current_row_group { + Some(reader) => { + let mut batch: Vec> = Vec::with_capacity(reader.num_columns()); + for i in 0..self.column_readers.len() { + let is_nullable = self.schema().field(i).is_nullable(); + let array: Arc = match self.column_readers[i] { + ColumnReader::BoolColumnReader(ref mut r) => { + ArrowReader::::read( + r, + self.batch_size, + is_nullable, + )? + } + ColumnReader::Int32ColumnReader(ref mut r) => { + ArrowReader::::read( + r, + self.batch_size, + is_nullable, + )? + } + ColumnReader::Int64ColumnReader(ref mut r) => { + ArrowReader::::read( + r, + self.batch_size, + is_nullable, + )? + } + ColumnReader::Int96ColumnReader(ref mut r) => { + let mut read_buffer: Vec = + vec![Int96::new(); self.batch_size]; + + let mut def_levels: Vec = vec![0; self.batch_size]; + let (_, levels_read) = r.read_batch( + self.batch_size, + Some(&mut def_levels), + None, + &mut read_buffer, + )?; + + let mut builder = + TimestampNanosecondBuilder::new(levels_read); + let mut value_index = 0; + for i in 0..levels_read { + if def_levels[i] > 0 { + builder.append_value(convert_int96_timestamp( + read_buffer[value_index].data(), + ))?; + value_index += 1; + } else { + builder.append_null()?; + } + } + Arc::new(builder.finish()) + } + ColumnReader::FloatColumnReader(ref mut r) => { + ArrowReader::::read( + r, + self.batch_size, + is_nullable, + )? + } + ColumnReader::DoubleColumnReader(ref mut r) => { + ArrowReader::::read( + r, + self.batch_size, + is_nullable, + )? + } + ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => { + read_binary_column!(self, r, i) + } + ColumnReader::ByteArrayColumnReader(ref mut r) => { + read_binary_column!(self, r, i) + } + }; + + batch.push(array); + } + + if batch.len() == 0 || batch[0].data().len() == 0 { + Ok(None) + } else { + Ok(Some(RecordBatch::try_new( + self.projection_schema.clone(), + batch, + )?)) + } + } + _ => Ok(None), + } + } +} + +/// Create a new schema by applying a projection to this schema's fields +fn schema_projection(schema: &Schema, projection: &[usize]) -> Result> { + let mut fields: Vec = Vec::with_capacity(projection.len()); + for i in projection { + if *i < schema.fields().len() { + fields.push(schema.field(*i).clone()); + } else { + return Err(ExecutionError::InvalidColumn(format!( + "Invalid column index {} in projection", + i + ))); + } + } + Ok(Arc::new(Schema::new(fields))) +} + +/// convert a Parquet INT96 to an Arrow timestamp in nanoseconds +fn convert_int96_timestamp(v: &[u32]) -> i64 { + const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; + const SECONDS_PER_DAY: i64 = 86_400; + const MILLIS_PER_SECOND: i64 = 1_000; + + let day = v[2] as i64; + let nanoseconds = ((v[1] as i64) << 32) + v[0] as i64; + let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + seconds * MILLIS_PER_SECOND * 1_000_000 + nanoseconds +} + +impl RecordBatchIterator for ParquetFile { + fn schema(&self) -> &Arc { + &self.projection_schema + } + + fn next(&mut self) -> Result> { + // advance the row group reader if necessary + if self.current_row_group.is_none() { + self.load_next_row_group()?; + self.load_batch() + } else { + match self.load_batch() { + Ok(Some(b)) => Ok(Some(b)), + Ok(None) => { + if self.row_group_index < self.reader.num_row_groups() { + self.load_next_row_group()?; + self.load_batch() + } else { + Ok(None) + } + } + Err(e) => Err(e), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ + BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, + TimestampNanosecondArray, + }; + use std::env; + + #[test] + fn read_small_batches() { + let table = load_table("alltypes_plain.parquet"); + + let projection = None; + let scan = table.scan(&projection, 2).unwrap(); + let mut it = scan[0].lock().unwrap(); + + let mut count = 0; + while let Some(batch) = it.next().unwrap() { + assert_eq!(11, batch.num_columns()); + assert_eq!(2, batch.num_rows()); + count += 1; + } + + // we should have seen 4 batches of 2 rows + assert_eq!(4, count); + } + + #[test] + fn read_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = None; + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan[0].lock().unwrap(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(11, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + } + + #[test] + fn read_bool_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = Some(vec![1]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan[0].lock().unwrap(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!( + "[true, false, true, false, true, false, true, false]", + format!("{:?}", values) + ); + } + + #[test] + fn read_i32_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = Some(vec![0]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan[0].lock().unwrap(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values)); + } + + #[test] + fn read_i96_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = Some(vec![10]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan[0].lock().unwrap(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!("[1235865600000000000, 1235865660000000000, 1238544000000000000, 1238544060000000000, 1233446400000000000, 1233446460000000000, 1230768000000000000, 1230768060000000000]", format!("{:?}", values)); + } + + #[test] + fn read_f32_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = Some(vec![6]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan[0].lock().unwrap(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!( + "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]", + format!("{:?}", values) + ); + } + + #[test] + fn read_f64_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = Some(vec![7]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan[0].lock().unwrap(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!( + "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]", + format!("{:?}", values) + ); + } + + #[test] + fn read_utf8_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = Some(vec![9]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan[0].lock().unwrap(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + let str: String = String::from_utf8(array.value(i).to_vec()).unwrap(); + values.push(str); + } + + assert_eq!( + "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]", + format!("{:?}", values) + ); + } + + fn load_table(name: &str) -> Box { + let testdata = env::var("PARQUET_TEST_DATA").unwrap(); + let filename = format!("{}/{}", testdata, name); + let table = ParquetTable::try_new(&filename).unwrap(); + Box::new(table) + } +} diff --git a/rust/datafusion/src/execution/error.rs b/rust/datafusion/src/execution/error.rs index 5b8d04d3dca34..92ce6d91c10a7 100644 --- a/rust/datafusion/src/execution/error.rs +++ b/rust/datafusion/src/execution/error.rs @@ -21,6 +21,7 @@ use std::io::Error; use std::result; use arrow::error::ArrowError; +use parquet::errors::ParquetError; use sqlparser::sqlparser::ParserError; @@ -35,6 +36,7 @@ pub enum ExecutionError { NotImplemented(String), InternalError(String), ArrowError(ArrowError), + ParquetError(ParquetError), ExecutionError(String), } @@ -62,6 +64,12 @@ impl From for ExecutionError { } } +impl From for ExecutionError { + fn from(e: ParquetError) -> Self { + ExecutionError::ParquetError(e) + } +} + impl From for ExecutionError { fn from(e: ParserError) -> Self { ExecutionError::ParserError(e) diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 954b2ee0fbf13..9c24a504bd394 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -16,6 +16,7 @@ // under the License. use std::cell::RefCell; +use std::env; use std::rc::Rc; use std::sync::Arc; @@ -25,11 +26,26 @@ extern crate datafusion; use arrow::array::*; use arrow::datatypes::{DataType, Field, Schema}; +use datafusion::datasource::parquet::ParquetTable; +use datafusion::datasource::Table; use datafusion::execution::context::ExecutionContext; use datafusion::execution::relation::Relation; const DEFAULT_BATCH_SIZE: usize = 1024 * 1024; +#[test] +fn parquet_query() { + let mut ctx = ExecutionContext::new(); + ctx.register_table( + "alltypes_plain", + load_parquet_table("alltypes_plain.parquet"), + ); + let sql = "SELECT id, string_col FROM alltypes_plain"; + let actual = execute(&mut ctx, sql); + let expected = "4\t\"0\"\n5\t\"1\"\n6\t\"0\"\n7\t\"1\"\n2\t\"0\"\n3\t\"1\"\n0\t\"0\"\n1\t\"1\"\n".to_string(); + assert_eq!(expected, actual); +} + #[test] fn csv_query_with_predicate() { let mut ctx = ExecutionContext::new(); @@ -163,9 +179,17 @@ fn register_csv( ctx.register_csv(name, filename, &schema, true); } +fn load_parquet_table(name: &str) -> Rc
{ + let testdata = env::var("PARQUET_TEST_DATA").unwrap(); + let filename = format!("{}/{}", testdata, name); + let table = ParquetTable::try_new(&filename).unwrap(); + Rc::new(table) +} + /// Execute query and return result set as tab delimited string fn execute(ctx: &mut ExecutionContext, sql: &str) -> String { - let results = ctx.sql(&sql, DEFAULT_BATCH_SIZE).unwrap(); + let plan = ctx.create_logical_plan(&sql).unwrap(); + let results = ctx.execute(&plan, DEFAULT_BATCH_SIZE).unwrap(); result_str(&results) } diff --git a/rust/parquet/src/reader/schema.rs b/rust/parquet/src/reader/schema.rs index 34276a2d5633f..5af07be7460f6 100644 --- a/rust/parquet/src/reader/schema.rs +++ b/rust/parquet/src/reader/schema.rs @@ -28,7 +28,8 @@ use crate::basic::{LogicalType, Repetition, Type as PhysicalType}; use crate::errors::{ParquetError::ArrowError, Result}; use crate::schema::types::{SchemaDescPtr, Type, TypePtr}; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::TimeUnit; +use arrow::datatypes::{DataType, DateUnit, Field, Schema}; /// Convert parquet schema to arrow schema. pub fn parquet_to_arrow_schema(parquet_schema: SchemaDescPtr) -> Result { @@ -175,19 +176,20 @@ impl ParquetTypeConverter { fn to_primitive_type_inner(&self) -> Result { match self.schema.get_physical_type() { PhysicalType::BOOLEAN => Ok(DataType::Boolean), - PhysicalType::INT32 => self.to_int32(), - PhysicalType::INT64 => self.to_int64(), + PhysicalType::INT32 => self.from_int32(), + PhysicalType::INT64 => self.from_int64(), + PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond)), PhysicalType::FLOAT => Ok(DataType::Float32), PhysicalType::DOUBLE => Ok(DataType::Float64), - PhysicalType::BYTE_ARRAY => self.to_byte_array(), + PhysicalType::BYTE_ARRAY => self.from_byte_array(), other => Err(ArrowError(format!( - "Unable to convert parquet type {}", + "Unable to convert parquet physical type {}", other ))), } } - fn to_int32(&self) -> Result { + fn from_int32(&self) -> Result { match self.schema.get_basic_info().logical_type() { LogicalType::NONE => Ok(DataType::Int32), LogicalType::UINT_8 => Ok(DataType::UInt8), @@ -196,30 +198,40 @@ impl ParquetTypeConverter { LogicalType::INT_8 => Ok(DataType::Int8), LogicalType::INT_16 => Ok(DataType::Int16), LogicalType::INT_32 => Ok(DataType::Int32), + LogicalType::DATE => Ok(DataType::Date32(DateUnit::Millisecond)), + LogicalType::TIME_MILLIS => Ok(DataType::Time32(TimeUnit::Millisecond)), other => Err(ArrowError(format!( - "Unable to convert parquet logical type {}", + "Unable to convert parquet INT32 logical type {}", other ))), } } - fn to_int64(&self) -> Result { + fn from_int64(&self) -> Result { match self.schema.get_basic_info().logical_type() { LogicalType::NONE => Ok(DataType::Int64), LogicalType::INT_64 => Ok(DataType::Int64), LogicalType::UINT_64 => Ok(DataType::UInt64), + LogicalType::TIME_MICROS => Ok(DataType::Time64(TimeUnit::Microsecond)), + LogicalType::TIMESTAMP_MICROS => { + Ok(DataType::Timestamp(TimeUnit::Microsecond)) + } + LogicalType::TIMESTAMP_MILLIS => { + Ok(DataType::Timestamp(TimeUnit::Millisecond)) + } other => Err(ArrowError(format!( - "Unable to convert parquet logical type {}", + "Unable to convert parquet INT64 logical type {}", other ))), } } - fn to_byte_array(&self) -> Result { + fn from_byte_array(&self) -> Result { match self.schema.get_basic_info().logical_type() { + LogicalType::NONE => Ok(DataType::Utf8), LogicalType::UTF8 => Ok(DataType::Utf8), other => Err(ArrowError(format!( - "Unable to convert parquet logical type {}", + "Unable to convert parquet BYTE_ARRAY logical type {}", other ))), }