From 41896b5dd48a4bde671d8fa13ee3858fcd0b065b Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Fri, 11 Jan 2019 00:17:32 +0000 Subject: [PATCH] ICE https://github.com/rust-lang/rust/issues/53443 --- src/lib.rs | 3 + src/record/mod.rs | 24 +- src/record/reader.rs | 271 +++++++++- src/record/schemas.rs | 416 +++++++++++++++ src/record/types.rs | 1119 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 1815 insertions(+), 18 deletions(-) create mode 100644 src/record/schemas.rs create mode 100644 src/record/types.rs diff --git a/src/lib.rs b/src/lib.rs index f687222..1025af7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -124,6 +124,9 @@ #![feature(rustc_private)] #![feature(specialization)] #![feature(try_from)] +#![feature(existential_type)] +#![feature(maybe_uninit)] +#![feature(nll)] #![allow(dead_code)] #![allow(non_camel_case_types)] diff --git a/src/record/mod.rs b/src/record/mod.rs index a0b2bee..1c42715 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -20,6 +20,28 @@ mod api; pub mod reader; mod triplet; +pub mod schemas; +pub mod types; +use std::{fmt,fmt::Debug}; +use schema::types::ColumnDescPtr; +use schema::types::ColumnPath; +use std::collections::HashMap; +use errors::ParquetError; +use schema::types::Type; +use record::reader::Reader; +use column::reader::ColumnReader; // pub use self::api::{List, ListAccessor, Map, MapAccessor, Row, RowAccessor}; -pub use self::triplet::TypedTripletIter; +// pub use self::triplet::TypedTripletIter; + +pub trait DebugType { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error>; +} + +pub trait Deserialize: Sized { + type Schema: Debug + DebugType; + type Reader: Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError>; + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader; +} diff --git a/src/record/reader.rs b/src/record/reader.rs index 2931836..1958cb1 100644 --- a/src/record/reader.rs +++ b/src/record/reader.rs @@ -19,7 +19,6 @@ //! [`Row`](`::record::api::Row`)s. use std::{collections::HashMap, fmt, rc::Rc}; - use basic::{LogicalType, Repetition}; use errors::{ParquetError, Result}; use file::reader::{FileReader, RowGroupReader}; @@ -31,9 +30,10 @@ use schema::types::{ColumnPath, SchemaDescPtr, SchemaDescriptor, Type, TypePtr}; use record::triplet::TypedTripletIter; use data_type::{BoolType, Int32Type, Int64Type, Int96Type, FloatType, DoubleType, ByteArrayType, FixedLenByteArrayType, Int96}; use basic::Type as PhysicalType; - -// /// Default batch size for a reader -// const DEFAULT_BATCH_SIZE: usize = 1024; +use super::{Deserialize,DebugType}; +use super::types::{Value,Group,Timestamp,List,Map,Root}; +use std::{convert::TryInto,marker::PhantomData}; +use std::error::Error; // /// Tree builder for `Reader` enum. // /// Serves as a container of options for building a reader tree and a builder, and @@ -379,7 +379,7 @@ use basic::Type as PhysicalType; // } // } -impl RRReader for sum::Sum3 where A: RRReader, B: RRReader, C: RRReader { +impl Reader for sum::Sum3 where A: Reader, B: Reader, C: Reader { type Item = A::Item; fn read_field(&mut self) -> Result { @@ -460,7 +460,7 @@ pub struct KeyValueReader { pub values_reader: V, } -pub trait RRReader { +pub trait Reader { type Item; fn read_field(&mut self) -> Result; fn advance_columns(&mut self); @@ -471,7 +471,7 @@ pub trait RRReader { fn current_rep_level(&self) -> i16; } -impl RRReader for BoolReader { +impl Reader for BoolReader { type Item = bool; fn read_field(&mut self) -> Result { @@ -498,7 +498,7 @@ impl RRReader for BoolReader { self.column.current_rep_level() } } -impl RRReader for I32Reader { +impl Reader for I32Reader { type Item = i32; fn read_field(&mut self) -> Result { @@ -525,7 +525,7 @@ impl RRReader for I32Reader { self.column.current_rep_level() } } -impl RRReader for I64Reader { +impl Reader for I64Reader { type Item = i64; fn read_field(&mut self) -> Result { @@ -552,7 +552,7 @@ impl RRReader for I64Reader { self.column.current_rep_level() } } -impl RRReader for I96Reader { +impl Reader for I96Reader { type Item = Int96; fn read_field(&mut self) -> Result { @@ -579,7 +579,7 @@ impl RRReader for I96Reader { self.column.current_rep_level() } } -impl RRReader for F32Reader { +impl Reader for F32Reader { type Item = f32; fn read_field(&mut self) -> Result { @@ -606,7 +606,7 @@ impl RRReader for F32Reader { self.column.current_rep_level() } } -impl RRReader for F64Reader { +impl Reader for F64Reader { type Item = f64; fn read_field(&mut self) -> Result { @@ -633,7 +633,7 @@ impl RRReader for F64Reader { self.column.current_rep_level() } } -impl RRReader for ByteArrayReader { +impl Reader for ByteArrayReader { type Item = Vec; fn read_field(&mut self) -> Result { @@ -660,7 +660,7 @@ impl RRReader for ByteArrayReader { self.column.current_rep_level() } } -impl RRReader for FixedLenByteArrayReader { +impl Reader for FixedLenByteArrayReader { type Item = Vec; fn read_field(&mut self) -> Result { @@ -687,7 +687,7 @@ impl RRReader for FixedLenByteArrayReader { self.column.current_rep_level() } } -impl RRReader for OptionReader { +impl Reader for OptionReader { type Item = Option; fn read_field(&mut self) -> Result { @@ -726,7 +726,7 @@ impl RRReader for OptionReader { } } -impl RRReader for RepeatedReader { +impl Reader for RepeatedReader { type Item = Vec; fn read_field(&mut self) -> Result { @@ -791,7 +791,7 @@ impl RRReader for RepeatedReader { } } -impl RRReader for KeyValueReader { +impl Reader for KeyValueReader { type Item = Vec<(K::Item,V::Item)>; fn read_field(&mut self) -> Result { @@ -860,6 +860,243 @@ impl RRReader for KeyValueReader { } } +pub struct GroupReader { + pub(super) def_level: i16, + pub(super) readers: Vec, + pub(super) fields: Rc>, +} +impl Reader for GroupReader { + type Item = Group; + + fn read_field(&mut self) -> Result { + let mut fields = Vec::new(); + for reader in self.readers.iter_mut() { + fields.push(reader.read_field()?); + } + Ok(Group(fields, self.fields.clone())) + } + fn advance_columns(&mut self) { + for reader in self.readers.iter_mut() { + reader.advance_columns(); + } + } + fn has_next(&self) -> bool { + self.readers.first().unwrap().has_next() + } + fn current_def_level(&self) -> i16 { + match self.readers.first() { + Some(reader) => reader.current_def_level(), + None => panic!("Current definition level: empty group reader"), + } + } + fn current_rep_level(&self) -> i16 { + match self.readers.first() { + Some(reader) => reader.current_rep_level(), + None => panic!("Current repetition level: empty group reader"), + } + } +} + +pub enum ValueReader { + Bool(::Reader), + U8(::Reader), + I8(::Reader), + U16(::Reader), + I16(::Reader), + U32(::Reader), + I32(::Reader), + U64(::Reader), + I64(::Reader), + F32(::Reader), + F64(::Reader), + Timestamp(::Reader), + Array( as Deserialize>::Reader), + String(::Reader), + List(Box< as Deserialize>::Reader>), + Map(Box< as Deserialize>::Reader>), + Group(::Reader), + Option(Box< as Deserialize>::Reader>), +} +impl Reader for ValueReader { + type Item = Value; + + fn read_field(&mut self) -> Result { + match self { + ValueReader::Bool(ref mut reader) => reader.read_field().map(Value::Bool), + ValueReader::U8(ref mut reader) => reader.read_field().map(Value::U8), + ValueReader::I8(ref mut reader) => reader.read_field().map(Value::I8), + ValueReader::U16(ref mut reader) => reader.read_field().map(Value::U16), + ValueReader::I16(ref mut reader) => reader.read_field().map(Value::I16), + ValueReader::U32(ref mut reader) => reader.read_field().map(Value::U32), + ValueReader::I32(ref mut reader) => reader.read_field().map(Value::I32), + ValueReader::U64(ref mut reader) => reader.read_field().map(Value::U64), + ValueReader::I64(ref mut reader) => reader.read_field().map(Value::I64), + ValueReader::F32(ref mut reader) => reader.read_field().map(Value::F32), + ValueReader::F64(ref mut reader) => reader.read_field().map(Value::F64), + ValueReader::Timestamp(ref mut reader) => reader.read_field().map(Value::Timestamp), + ValueReader::Array(ref mut reader) => reader.read_field().map(Value::Array), + ValueReader::String(ref mut reader) => reader.read_field().map(Value::String), + ValueReader::List(ref mut reader) => reader.read_field().map(Value::List), + ValueReader::Map(ref mut reader) => reader.read_field().map(Value::Map), + ValueReader::Group(ref mut reader) => reader.read_field().map(Value::Group), + ValueReader::Option(ref mut reader) => reader.read_field().map(|x|Value::Option(Box::new(x))), + } + } + fn advance_columns(&mut self) { + match self { + ValueReader::Bool(ref mut reader) => reader.advance_columns(), + ValueReader::U8(ref mut reader) => reader.advance_columns(), + ValueReader::I8(ref mut reader) => reader.advance_columns(), + ValueReader::U16(ref mut reader) => reader.advance_columns(), + ValueReader::I16(ref mut reader) => reader.advance_columns(), + ValueReader::U32(ref mut reader) => reader.advance_columns(), + ValueReader::I32(ref mut reader) => reader.advance_columns(), + ValueReader::U64(ref mut reader) => reader.advance_columns(), + ValueReader::I64(ref mut reader) => reader.advance_columns(), + ValueReader::F32(ref mut reader) => reader.advance_columns(), + ValueReader::F64(ref mut reader) => reader.advance_columns(), + ValueReader::Timestamp(ref mut reader) => reader.advance_columns(), + ValueReader::Array(ref mut reader) => reader.advance_columns(), + ValueReader::String(ref mut reader) => reader.advance_columns(), + ValueReader::List(ref mut reader) => reader.advance_columns(), + ValueReader::Map(ref mut reader) => reader.advance_columns(), + ValueReader::Group(ref mut reader) => reader.advance_columns(), + ValueReader::Option(ref mut reader) => reader.advance_columns(), + } + } + fn has_next(&self) -> bool { + match self { + ValueReader::Bool(ref reader) => reader.has_next(), + ValueReader::U8(ref reader) => reader.has_next(), + ValueReader::I8(ref reader) => reader.has_next(), + ValueReader::U16(ref reader) => reader.has_next(), + ValueReader::I16(ref reader) => reader.has_next(), + ValueReader::U32(ref reader) => reader.has_next(), + ValueReader::I32(ref reader) => reader.has_next(), + ValueReader::U64(ref reader) => reader.has_next(), + ValueReader::I64(ref reader) => reader.has_next(), + ValueReader::F32(ref reader) => reader.has_next(), + ValueReader::F64(ref reader) => reader.has_next(), + ValueReader::Timestamp(ref reader) => reader.has_next(), + ValueReader::Array(ref reader) => reader.has_next(), + ValueReader::String(ref reader) => reader.has_next(), + ValueReader::List(ref reader) => reader.has_next(), + ValueReader::Map(ref reader) => reader.has_next(), + ValueReader::Group(ref reader) => reader.has_next(), + ValueReader::Option(ref reader) => reader.has_next(), + } + } + fn current_def_level(&self) -> i16 { + match self { + ValueReader::Bool(ref reader) => reader.current_def_level(), + ValueReader::U8(ref reader) => reader.current_def_level(), + ValueReader::I8(ref reader) => reader.current_def_level(), + ValueReader::U16(ref reader) => reader.current_def_level(), + ValueReader::I16(ref reader) => reader.current_def_level(), + ValueReader::U32(ref reader) => reader.current_def_level(), + ValueReader::I32(ref reader) => reader.current_def_level(), + ValueReader::U64(ref reader) => reader.current_def_level(), + ValueReader::I64(ref reader) => reader.current_def_level(), + ValueReader::F32(ref reader) => reader.current_def_level(), + ValueReader::F64(ref reader) => reader.current_def_level(), + ValueReader::Timestamp(ref reader) => reader.current_def_level(), + ValueReader::Array(ref reader) => reader.current_def_level(), + ValueReader::String(ref reader) => reader.current_def_level(), + ValueReader::List(ref reader) => reader.current_def_level(), + ValueReader::Map(ref reader) => reader.current_def_level(), + ValueReader::Group(ref reader) => reader.current_def_level(), + ValueReader::Option(ref reader) => reader.current_def_level(), + } + } + fn current_rep_level(&self) -> i16 { + match self { + ValueReader::Bool(ref reader) => reader.current_rep_level(), + ValueReader::U8(ref reader) => reader.current_rep_level(), + ValueReader::I8(ref reader) => reader.current_rep_level(), + ValueReader::U16(ref reader) => reader.current_rep_level(), + ValueReader::I16(ref reader) => reader.current_rep_level(), + ValueReader::U32(ref reader) => reader.current_rep_level(), + ValueReader::I32(ref reader) => reader.current_rep_level(), + ValueReader::U64(ref reader) => reader.current_rep_level(), + ValueReader::I64(ref reader) => reader.current_rep_level(), + ValueReader::F32(ref reader) => reader.current_rep_level(), + ValueReader::F64(ref reader) => reader.current_rep_level(), + ValueReader::Timestamp(ref reader) => reader.current_rep_level(), + ValueReader::Array(ref reader) => reader.current_rep_level(), + ValueReader::String(ref reader) => reader.current_rep_level(), + ValueReader::List(ref reader) => reader.current_rep_level(), + ValueReader::Map(ref reader) => reader.current_rep_level(), + ValueReader::Group(ref reader) => reader.current_rep_level(), + ValueReader::Option(ref reader) => reader.current_rep_level(), + } + } +} + +pub struct TupleReader(pub(super) T); + +pub struct TryIntoReader(pub(super) R, pub(super) PhantomData); +impl Reader for TryIntoReader where R::Item: TryInto, >::Error: Error { + type Item = T; + + fn read_field(&mut self) -> Result { + self.0.read_field().and_then(|x|x.try_into().map_err(|err|ParquetError::General(err.description().to_owned()))) + } + fn advance_columns(&mut self) { + self.0.advance_columns() + } + fn has_next(&self) -> bool { + self.0.has_next() + } + fn current_def_level(&self) -> i16 { + self.0.current_def_level() + } + fn current_rep_level(&self) -> i16 { + self.0.current_rep_level() + } +} + +pub struct MapReader(pub(super) R, pub(super) F); +impl Reader for MapReader where F: FnMut(R::Item) -> Result { + type Item = T; + + fn read_field(&mut self) -> Result { + self.0.read_field().and_then(&mut self.1) + } + fn advance_columns(&mut self) { + self.0.advance_columns() + } + fn has_next(&self) -> bool { + self.0.has_next() + } + fn current_def_level(&self) -> i16 { + self.0.current_def_level() + } + fn current_rep_level(&self) -> i16 { + self.0.current_rep_level() + } +} + +pub struct RootReader(pub R); +impl Reader for RootReader where R: Reader { + type Item = Root; + + fn read_field(&mut self) -> Result { + self.0.read_field().map(Root) + } + fn advance_columns(&mut self) { + self.0.advance_columns(); + } + fn has_next(&self) -> bool { + self.0.has_next() + } + fn current_def_level(&self) -> i16 { + self.0.current_def_level() + } + fn current_rep_level(&self) -> i16 { + self.0.current_rep_level() + } +} + // /// Reader tree for record assembly // pub enum Reader { // // Primitive reader with type information and triplet iterator diff --git a/src/record/schemas.rs b/src/record/schemas.rs new file mode 100644 index 0000000..d627897 --- /dev/null +++ b/src/record/schemas.rs @@ -0,0 +1,416 @@ +use crate::errors::ParquetError; +use super::DebugType; +use super::types::Downcast; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::{fmt,fmt::Debug}; + +#[derive(Debug)] +pub enum ValueSchema { + Bool(BoolSchema), + U8(U8Schema), + I8(I8Schema), + U16(U16Schema), + I16(I16Schema), + U32(U32Schema), + I32(I32Schema), + U64(U64Schema), + I64(I64Schema), + F32(F32Schema), + F64(F64Schema), + Timestamp(TimestampSchema), + Array(VecSchema), + String(StringSchema), + List(Box>), + Map(Box>), + Group(GroupSchema), + Option(Box>), +} +impl DebugType for ValueSchema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("ValueSchema") + } +} +impl ValueSchema { + pub fn is_bool(&self) -> bool { + if let ValueSchema::Bool(ret) = self { true } else { false } + } + pub fn as_bool(self) -> Result { + if let ValueSchema::Bool(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_u8(&self) -> bool { + if let ValueSchema::U8(ret) = self { true } else { false } + } + pub fn as_u8(self) -> Result { + if let ValueSchema::U8(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_i8(&self) -> bool { + if let ValueSchema::I8(ret) = self { true } else { false } + } + pub fn as_i8(self) -> Result { + if let ValueSchema::I8(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_u16(&self) -> bool { + if let ValueSchema::U16(ret) = self { true } else { false } + } + pub fn as_u16(self) -> Result { + if let ValueSchema::U16(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_i16(&self) -> bool { + if let ValueSchema::I16(ret) = self { true } else { false } + } + pub fn as_i16(self) -> Result { + if let ValueSchema::I16(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_u32(&self) -> bool { + if let ValueSchema::U32(ret) = self { true } else { false } + } + pub fn as_u32(self) -> Result { + if let ValueSchema::U32(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_i32(&self) -> bool { + if let ValueSchema::I32(ret) = self { true } else { false } + } + pub fn as_i32(self) -> Result { + if let ValueSchema::I32(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_u64(&self) -> bool { + if let ValueSchema::U64(ret) = self { true } else { false } + } + pub fn as_u64(self) -> Result { + if let ValueSchema::U64(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_i64(&self) -> bool { + if let ValueSchema::I64(ret) = self { true } else { false } + } + pub fn as_i64(self) -> Result { + if let ValueSchema::I64(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_f32(&self) -> bool { + if let ValueSchema::F32(ret) = self { true } else { false } + } + pub fn as_f32(self) -> Result { + if let ValueSchema::F32(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_f64(&self) -> bool { + if let ValueSchema::F64(ret) = self { true } else { false } + } + pub fn as_f64(self) -> Result { + if let ValueSchema::F64(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_timestamp(&self) -> bool { + if let ValueSchema::Timestamp(ret) = self { true } else { false } + } + pub fn as_timestamp(self) -> Result { + if let ValueSchema::Timestamp(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_array(&self) -> bool { + if let ValueSchema::Array(ret) = self { true } else { false } + } + pub fn as_array(self) -> Result { + if let ValueSchema::Array(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_string(&self) -> bool { + if let ValueSchema::String(ret) = self { true } else { false } + } + pub fn as_string(self) -> Result { + if let ValueSchema::String(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_list(&self) -> bool { + if let ValueSchema::List(ret) = self { true } else { false } + } + pub fn as_list(self) -> Result, ParquetError> { + if let ValueSchema::List(ret) = self { Ok(*ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_map(&self) -> bool { + if let ValueSchema::Map(ret) = self { true } else { false } + } + pub fn as_map(self) -> Result, ParquetError> { + if let ValueSchema::Map(ret) = self { Ok(*ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_group(&self) -> bool { + if let ValueSchema::Group(ret) = self { true } else { false } + } + pub fn as_group(self) -> Result { + if let ValueSchema::Group(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + pub fn is_option(&self) -> bool { + if let ValueSchema::Option(ret) = self { true } else { false } + } + pub fn as_option(self) -> Result, ParquetError> { + if let ValueSchema::Option(ret) = self { Ok(*ret) } else { Err(ParquetError::General(String::from(""))) } + } +} + +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + Ok(self) + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_bool() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_u8() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_i8() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_u16() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_i16() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_u32() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_i32() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_u64() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_i64() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_f32() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_f64() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_timestamp() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_array() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_string() + } +} +impl Downcast> for ValueSchema where ValueSchema: Downcast { + default fn downcast(self) -> Result,ParquetError> { + let ret = self.as_list()?; + Ok(ListSchema(ret.0.downcast()?, ret.1)) + } +} +impl Downcast> for ValueSchema { + fn downcast(self) -> Result,ParquetError> { + self.as_list() + } +} +impl Downcast> for ValueSchema where ValueSchema: Downcast + Downcast { + default fn downcast(self) -> Result,ParquetError> { + let ret = self.as_map()?; + Ok(MapSchema(ret.0.downcast()?, ret.1.downcast()?, ret.2, ret.3, ret.4)) + } +} +impl Downcast> for ValueSchema { + fn downcast(self) -> Result,ParquetError> { + self.as_map() + } +} +impl Downcast for ValueSchema { + fn downcast(self) -> Result { + self.as_group() + } +} +impl Downcast> for ValueSchema where ValueSchema: Downcast { + default fn downcast(self) -> Result,ParquetError> { + let ret = self.as_option()?; + ret.0.downcast().map(OptionSchema) + } +} +impl Downcast> for ValueSchema { + fn downcast(self) -> Result,ParquetError> { + self.as_option() + } +} + +#[derive(Debug)] +pub struct GroupSchema(pub(super) Vec,pub(super) HashMap); +impl DebugType for GroupSchema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("GroupSchema") + } +} + +pub struct RootSchema(pub String, pub S, pub PhantomData); +impl Debug for RootSchema where S: Debug { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_tuple("RootSchema").field(&self.0).field(&self.1).finish() + } +} +impl DebugType for RootSchema where S: DebugType { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("RootSchema") + } +} + +#[derive(Debug)] +pub struct VecSchema(pub(super) Option); +impl DebugType for VecSchema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("VecSchema") + } +} + +pub struct ArraySchema(pub(super) PhantomData); +impl Debug for ArraySchema { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_tuple("ArraySchema").finish() + } +} +impl DebugType for ArraySchema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("ArraySchema") + } +} + +pub struct TupleSchema(pub(super) T); + +#[derive(Debug)] +pub struct MapSchema(pub(super) K,pub(super) V,pub(super) Option,pub(super) Option,pub(super) Option); +impl DebugType for MapSchema where K: DebugType, V: DebugType { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("MapSchema") + } +} +#[derive(Debug)] +pub struct OptionSchema(pub(super) T); +impl DebugType for OptionSchema where T: DebugType { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("OptionSchema") + } +} +#[derive(Debug)] +pub struct ListSchema(pub(super) T,pub(super) Option<(Option,Option)>); +impl DebugType for ListSchema where T: DebugType { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("ListSchema") + } +} +#[derive(Debug)] +pub struct BoolSchema; +impl DebugType for BoolSchema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("BoolSchema") + } +} +#[derive(Debug)] +pub struct U8Schema; +impl DebugType for U8Schema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("U8Schema") + } +} +#[derive(Debug)] +pub struct I8Schema; +impl DebugType for I8Schema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("I8Schema") + } +} +#[derive(Debug)] +pub struct U16Schema; +impl DebugType for U16Schema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("U16Schema") + } +} +#[derive(Debug)] +pub struct I16Schema; +impl DebugType for I16Schema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("I16Schema") + } +} +#[derive(Debug)] +pub struct U32Schema; +impl DebugType for U32Schema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("U32Schema") + } +} +#[derive(Debug)] +pub struct I32Schema; +impl DebugType for I32Schema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("I32Schema") + } +} +#[derive(Debug)] +pub struct U64Schema; +impl DebugType for U64Schema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("U64Schema") + } +} +#[derive(Debug)] +pub struct I64Schema; +impl DebugType for I64Schema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("I64Schema") + } +} +#[derive(Debug)] +pub struct F64Schema; +impl DebugType for F64Schema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("F64Schema") + } +} +#[derive(Debug)] +pub struct F32Schema; +impl DebugType for F32Schema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("F32Schema") + } +} +#[derive(Debug)] +pub struct StringSchema; +impl DebugType for StringSchema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("StringSchema") + } +} +#[derive(Debug)] +pub enum TimestampSchema { + Int96, + Millis, + Micros, +} +impl DebugType for TimestampSchema { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("TimestampSchema") + } +} \ No newline at end of file diff --git a/src/record/types.rs b/src/record/types.rs new file mode 100644 index 0000000..380f03a --- /dev/null +++ b/src/record/types.rs @@ -0,0 +1,1119 @@ +use crate::{ + basic::{LogicalType, Repetition, Type as PhysicalType}, errors::ParquetError, file::reader::{FileReader, ParquetReader, SerializedFileReader}, schema::{ + printer::{print_file_metadata, print_parquet_metadata, print_schema}, types::{BasicTypeInfo, SchemaDescPtr, SchemaDescriptor, Type} + } +}; +use super::triplet::TypedTripletIter; +use crate::data_type::{BoolType,Int32Type,Int64Type,Int96Type,FloatType,DoubleType,ByteArrayType,FixedLenByteArrayType}; +use crate::column::reader::ColumnReader; +use crate::schema::types::ColumnDescPtr; +use crate::schema::types::ColumnPath; +use crate::data_type::Int96; +use crate::schema::parser::parse_message_type; +use super::{Deserialize,DebugType}; +use super::schemas::{ValueSchema,BoolSchema,U8Schema,I8Schema,U16Schema,I16Schema,U32Schema,I32Schema,U64Schema,I64Schema,F32Schema,F64Schema,VecSchema,ArraySchema,StringSchema,TimestampSchema,RootSchema,GroupSchema,TupleSchema,OptionSchema,ListSchema,MapSchema}; +use super::reader::{ValueReader,BoolReader,I32Reader,I64Reader,I96Reader,F32Reader,F64Reader,ByteArrayReader,FixedLenByteArrayReader,OptionReader,RepeatedReader,KeyValueReader,RootReader,GroupReader,TupleReader,Reader,MapReader,TryIntoReader}; +use std::collections::HashMap; +use std::hash::{Hash,Hasher}; +use std::{rc::Rc,fmt,str}; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::convert::TryInto; +use std::num::TryFromIntError; +use std::string::FromUtf8Error; +use std::error::Error; + +/// Default batch size for a reader +const DEFAULT_BATCH_SIZE: usize = 1024; + +#[derive(Clone, Hash, PartialEq, Eq, Debug)] +pub struct Root(pub T); +#[derive(Clone, Hash, PartialEq, Eq, Debug)] +pub struct Date(pub(super) i32); +#[derive(Clone, Hash, PartialEq, Eq, Debug)] +pub struct Time(pub(super) i64); + +const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; +const SECONDS_PER_DAY: i64 = 86_400; +const MILLIS_PER_SECOND: i64 = 1_000; +const MICROS_PER_MILLI: i64 = 1_000; + +#[derive(Clone, Hash, PartialEq, Eq, Debug)] +pub struct Timestamp(pub(super) Int96); +impl Timestamp { + fn as_day_nanos(&self) -> (i64,i64) { + let day = self.0.data()[2] as i64; + let nanoseconds = ((self.0.data()[1] as i64) << 32) + self.0.data()[0] as i64; + (day, nanoseconds) + } + fn as_millis(&self) -> Option { + let day = self.0.data()[2] as i64; + let nanoseconds = ((self.0.data()[1] as i64) << 32) + self.0.data()[0] as i64; + let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + Some(seconds * MILLIS_PER_SECOND + nanoseconds / 1_000_000) + } + fn as_micros(&self) -> Option { + let day = self.0.data()[2] as i64; + let nanoseconds = ((self.0.data()[1] as i64) << 32) + self.0.data()[0] as i64; + let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + Some(seconds * MILLIS_PER_SECOND * MICROS_PER_MILLI + nanoseconds / 1_000) + } + fn as_nanos(&self) -> Option { + let day = self.0.data()[2] as i64; + let nanoseconds = ((self.0.data()[1] as i64) << 32) + self.0.data()[0] as i64; + let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + Some(seconds * MILLIS_PER_SECOND * MICROS_PER_MILLI * 1_000 + nanoseconds) + } +} + +#[derive(Clone, Hash, PartialEq, Eq, Debug)] +pub struct List(pub(super) Vec); +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct Map(pub(super) HashMap); + +#[derive(Clone, PartialEq, Debug)] +pub enum Value { + Bool(bool), + U8(u8), + I8(i8), + U16(u16), + I16(i16), + U32(u32), + I32(i32), + U64(u64), + I64(i64), + F32(f32), + F64(f64), + Timestamp(Timestamp), + Array(Vec), + String(String), + List(List), + Map(Map), + Group(Group), + Option(Box>), +} +impl Hash for Value { + fn hash(&self, state: &mut H) { + match self { + Value::Bool(value) => {0u8.hash(state); value.hash(state);} + Value::U8(value) => {1u8.hash(state); value.hash(state);} + Value::I8(value) => {2u8.hash(state); value.hash(state);} + Value::U16(value) => {3u8.hash(state); value.hash(state);} + Value::I16(value) => {4u8.hash(state); value.hash(state);} + Value::U32(value) => {5u8.hash(state); value.hash(state);} + Value::I32(value) => {6u8.hash(state); value.hash(state);} + Value::U64(value) => {7u8.hash(state); value.hash(state);} + Value::I64(value) => {8u8.hash(state); value.hash(state);} + Value::F32(value) => {9u8.hash(state);} + Value::F64(value) => {10u8.hash(state);} + Value::Timestamp(value) => {11u8.hash(state); value.hash(state);} + Value::Array(value) => {12u8.hash(state); value.hash(state);} + Value::String(value) => {13u8.hash(state); value.hash(state);} + Value::List(value) => {14u8.hash(state); value.hash(state);} + Value::Map(value) => {15u8.hash(state);} + Value::Group(value) => {16u8.hash(state);} + Value::Option(value) => {17u8.hash(state); value.hash(state);} + } + } +} +impl Eq for Value {} + +impl Value { + fn is_bool(&self) -> bool { + if let Value::Bool(ret) = self { true } else { false } + } + fn as_bool(self) -> Result { + if let Value::Bool(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_u8(&self) -> bool { + if let Value::U8(ret) = self { true } else { false } + } + fn as_u8(self) -> Result { + if let Value::U8(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_i8(&self) -> bool { + if let Value::I8(ret) = self { true } else { false } + } + fn as_i8(self) -> Result { + if let Value::I8(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_u16(&self) -> bool { + if let Value::U16(ret) = self { true } else { false } + } + fn as_u16(self) -> Result { + if let Value::U16(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_i16(&self) -> bool { + if let Value::I16(ret) = self { true } else { false } + } + fn as_i16(self) -> Result { + if let Value::I16(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_u32(&self) -> bool { + if let Value::U32(ret) = self { true } else { false } + } + fn as_u32(self) -> Result { + if let Value::U32(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_i32(&self) -> bool { + if let Value::I32(ret) = self { true } else { false } + } + fn as_i32(self) -> Result { + if let Value::I32(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_u64(&self) -> bool { + if let Value::U64(ret) = self { true } else { false } + } + fn as_u64(self) -> Result { + if let Value::U64(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_i64(&self) -> bool { + if let Value::I64(ret) = self { true } else { false } + } + fn as_i64(self) -> Result { + if let Value::I64(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_f32(&self) -> bool { + if let Value::F32(ret) = self { true } else { false } + } + fn as_f32(self) -> Result { + if let Value::F32(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_f64(&self) -> bool { + if let Value::F64(ret) = self { true } else { false } + } + fn as_f64(self) -> Result { + if let Value::F64(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_timestamp(&self) -> bool { + if let Value::Timestamp(ret) = self { true } else { false } + } + fn as_timestamp(self) -> Result { + if let Value::Timestamp(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_array(&self) -> bool { + if let Value::Array(ret) = self { true } else { false } + } + fn as_array(self) -> Result, ParquetError> { + if let Value::Array(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_string(&self) -> bool { + if let Value::String(ret) = self { true } else { false } + } + fn as_string(self) -> Result { + if let Value::String(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_list(&self) -> bool { + if let Value::List(ret) = self { true } else { false } + } + fn as_list(self) -> Result, ParquetError> { + if let Value::List(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_map(&self) -> bool { + if let Value::Map(ret) = self { true } else { false } + } + fn as_map(self) -> Result, ParquetError> { + if let Value::Map(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_group(&self) -> bool { + if let Value::Group(ret) = self { true } else { false } + } + fn as_group(self) -> Result { + if let Value::Group(ret) = self { Ok(ret) } else { Err(ParquetError::General(String::from(""))) } + } + fn is_option(&self) -> bool { + if let Value::Option(ret) = self { true } else { false } + } + fn as_option(self) -> Result, ParquetError> { + if let Value::Option(ret) = self { Ok(*ret) } else { Err(ParquetError::General(String::from(""))) } + } +} + +pub trait Downcast { + fn downcast(self) -> Result; +} +impl Downcast for Value { + fn downcast(self) -> Result { + Ok(self) + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_bool() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_u8() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_i8() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_u16() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_i16() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_u32() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_i32() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_u64() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_i64() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_f32() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_f64() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_timestamp() + } +} +impl Downcast> for Value { + fn downcast(self) -> Result,ParquetError> { + self.as_array() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_string() + } +} +impl Downcast> for Value where Value: Downcast { + default fn downcast(self) -> Result,ParquetError> { + let ret = self.as_list()?; + ret.0.into_iter().map(Downcast::downcast).collect::,_>>().map(List) + } +} +impl Downcast> for Value { + fn downcast(self) -> Result,ParquetError> { + self.as_list() + } +} +impl Downcast> for Value where Value: Downcast + Downcast, K: Hash+Eq { + default fn downcast(self) -> Result,ParquetError> { + let ret = self.as_map()?; + ret.0.into_iter().map(|(k,v)|Ok((k.downcast()?,v.downcast()?))).collect::,_>>().map(Map) + } +} +impl Downcast> for Value { + fn downcast(self) -> Result,ParquetError> { + self.as_map() + } +} +impl Downcast for Value { + fn downcast(self) -> Result { + self.as_group() + } +} +impl Downcast> for Value where Value: Downcast { + default fn downcast(self) -> Result,ParquetError> { + let ret = self.as_option()?; + match ret { + Some(t) => Downcast::::downcast(t).map(Some), + None => Ok(None), + } + } +} +impl Downcast> for Value { + fn downcast(self) -> Result,ParquetError> { + self.as_option() + } +} + +impl Deserialize for Value { + type Schema = ValueSchema; + type Reader = ValueReader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + let mut value = None; + if schema.is_primitive() { + value = Some(match (schema.get_physical_type(), schema.get_basic_info().logical_type()) { + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md + (PhysicalType::BOOLEAN,LogicalType::NONE) => ValueSchema::Bool(BoolSchema), + (PhysicalType::INT32,LogicalType::UINT_8) => ValueSchema::U8(U8Schema), + (PhysicalType::INT32,LogicalType::INT_8) => ValueSchema::I8(I8Schema), + (PhysicalType::INT32,LogicalType::UINT_16) => ValueSchema::U16(U16Schema), + (PhysicalType::INT32,LogicalType::INT_16) => ValueSchema::I16(I16Schema), + (PhysicalType::INT32,LogicalType::UINT_32) => ValueSchema::U32(U32Schema), + (PhysicalType::INT32,LogicalType::INT_32) | (PhysicalType::INT32, LogicalType::NONE) => ValueSchema::I32(I32Schema), + (PhysicalType::INT32,LogicalType::DATE) => unimplemented!(), + (PhysicalType::INT32,LogicalType::TIME_MILLIS) => unimplemented!(), + (PhysicalType::INT32,LogicalType::DECIMAL) => unimplemented!(), + (PhysicalType::INT64,LogicalType::UINT_64) => ValueSchema::U64(U64Schema), + (PhysicalType::INT64,LogicalType::INT_64) | (PhysicalType::INT64,LogicalType::NONE) => ValueSchema::I64(I64Schema), + (PhysicalType::INT64,LogicalType::TIME_MICROS) => unimplemented!(), + // (PhysicalType::INT64,LogicalType::TIME_NANOS) => unimplemented!(), + (PhysicalType::INT64,LogicalType::TIMESTAMP_MILLIS) => ValueSchema::Timestamp(TimestampSchema::Millis), + (PhysicalType::INT64,LogicalType::TIMESTAMP_MICROS) => ValueSchema::Timestamp(TimestampSchema::Micros), + // (PhysicalType::INT64,LogicalType::TIMESTAMP_NANOS) => unimplemented!(), + (PhysicalType::INT64,LogicalType::DECIMAL) => unimplemented!(), + (PhysicalType::INT96,LogicalType::NONE) => ValueSchema::Timestamp(TimestampSchema::Int96), + (PhysicalType::FLOAT,LogicalType::NONE) => ValueSchema::F32(F32Schema), + (PhysicalType::DOUBLE,LogicalType::NONE) => ValueSchema::F64(F64Schema), + (PhysicalType::BYTE_ARRAY,LogicalType::UTF8) | (PhysicalType::BYTE_ARRAY,LogicalType::ENUM) | (PhysicalType::BYTE_ARRAY,LogicalType::JSON) | (PhysicalType::FIXED_LEN_BYTE_ARRAY,LogicalType::UTF8) | (PhysicalType::FIXED_LEN_BYTE_ARRAY,LogicalType::ENUM) | (PhysicalType::FIXED_LEN_BYTE_ARRAY,LogicalType::JSON) => ValueSchema::String(StringSchema), + (PhysicalType::BYTE_ARRAY,LogicalType::NONE) | (PhysicalType::BYTE_ARRAY,LogicalType::BSON) | (PhysicalType::FIXED_LEN_BYTE_ARRAY,LogicalType::NONE) | (PhysicalType::FIXED_LEN_BYTE_ARRAY,LogicalType::BSON) => ValueSchema::Array(VecSchema(if schema.get_physical_type() == PhysicalType::FIXED_LEN_BYTE_ARRAY { Some(schema.get_type_length().try_into().unwrap()) } else { None })), + (PhysicalType::BYTE_ARRAY,LogicalType::DECIMAL) | (PhysicalType::FIXED_LEN_BYTE_ARRAY,LogicalType::DECIMAL) => unimplemented!(), + (PhysicalType::BYTE_ARRAY,LogicalType::INTERVAL) | (PhysicalType::FIXED_LEN_BYTE_ARRAY,LogicalType::INTERVAL) => unimplemented!(), + _ => return Err(ParquetError::General(String::from("Value"))), + }); + } + if value.is_none() && schema.is_group() && !schema.is_schema() && schema.get_basic_info().logical_type() == LogicalType::LIST && schema.get_fields().len() == 1 { + let sub_schema = schema.get_fields().into_iter().nth(0).unwrap(); + if sub_schema.is_group() && !sub_schema.is_schema() && sub_schema.get_basic_info().repetition() == Repetition::REPEATED && sub_schema.get_fields().len() == 1 { + let element = sub_schema.get_fields().into_iter().nth(0).unwrap(); + let list_name = if sub_schema.name() == "list" { None } else { Some(sub_schema.name().to_owned()) }; + let element_name = if element.name() == "element" { None } else { Some(element.name().to_owned()) }; + value = Some(ValueSchema::List(Box::new(ListSchema(Value::parse(&*element)?.1, Some((list_name, element_name)))))); + } + // Err(ParquetError::General(String::from("List"))) + } + // TODO https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules + if value.is_none() && schema.is_group() && !schema.is_schema() && (schema.get_basic_info().logical_type() == LogicalType::MAP || schema.get_basic_info().logical_type() == LogicalType::MAP_KEY_VALUE) && schema.get_fields().len() == 1 { + let sub_schema = schema.get_fields().into_iter().nth(0).unwrap(); + if sub_schema.is_group() && !sub_schema.is_schema() && sub_schema.get_basic_info().repetition() == Repetition::REPEATED && sub_schema.get_fields().len() == 2 { + let mut fields = sub_schema.get_fields().into_iter(); + let (key, value_) = (fields.next().unwrap(), fields.next().unwrap()); + let key_value_name = if sub_schema.name() == "key_value" { None } else { Some(sub_schema.name().to_owned()) }; + let key_name = if key.name() == "key" { None } else { Some(key.name().to_owned()) }; + let value_name = if value_.name() == "value" { None } else { Some(value_.name().to_owned()) }; + value = Some(ValueSchema::Map(Box::new(MapSchema(Value::parse(&*key)?.1,Value::parse(&*value_)?.1, key_value_name, key_name, value_name)))); + } + } + + if value.is_none() && schema.is_group() && !schema.is_schema() { + let mut lookup = HashMap::new(); + value = Some(ValueSchema::Group(GroupSchema(schema.get_fields().iter().map(|schema|Value::parse(&*schema).map(|(name,schema)| {let x = lookup.insert(name, lookup.len()); assert!(x.is_none()); schema})).collect::,_>>()?, lookup))); + } + + if value.is_none() { + println!("errrrr"); + println!("{:?}", schema); + } + + let mut value = value.ok_or(ParquetError::General(String::from("Value")))?; + + match schema.get_basic_info().repetition() { + Repetition::OPTIONAL => { + value = ValueSchema::Option(Box::new(OptionSchema(value))); + } + Repetition::REPEATED => { + value = ValueSchema::List(Box::new(ListSchema(value, None))); + } + Repetition::REQUIRED => (), + } + + Ok((schema.name().to_owned(),value)) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + match *schema { + ValueSchema::Bool(ref schema) => ValueReader::Bool(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::U8(ref schema) => ValueReader::U8(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::I8(ref schema) => ValueReader::I8(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::U16(ref schema) => ValueReader::U16(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::I16(ref schema) => ValueReader::I16(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::U32(ref schema) => ValueReader::U32(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::I32(ref schema) => ValueReader::I32(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::U64(ref schema) => ValueReader::U64(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::I64(ref schema) => ValueReader::I64(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::F32(ref schema) => ValueReader::F32(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::F64(ref schema) => ValueReader::F64(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::Timestamp(ref schema) => ValueReader::Timestamp(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::Array(ref schema) => ValueReader::Array( as Deserialize>::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::String(ref schema) => ValueReader::String(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::List(ref schema) => ValueReader::List(Box::new( as Deserialize>::reader(schema, path, curr_def_level, curr_rep_level, paths))), + ValueSchema::Map(ref schema) => ValueReader::Map(Box::new( as Deserialize>::reader(schema, path, curr_def_level, curr_rep_level, paths))), + ValueSchema::Group(ref schema) => ValueReader::Group(::reader(schema, path, curr_def_level, curr_rep_level, paths)), + ValueSchema::Option(ref schema) => ValueReader::Option(Box::new( as Deserialize>::reader(schema, path, curr_def_level, curr_rep_level, paths))), + } + } +} +impl Deserialize for Root { + type Schema = RootSchema; + type Reader = RootReader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + assert!(schema.is_schema()); + let mut value = None; + if value.is_none() && schema.is_group() && schema.get_basic_info().logical_type() == LogicalType::LIST && schema.get_fields().len() == 1 { + let sub_schema = schema.get_fields().into_iter().nth(0).unwrap(); + if sub_schema.is_group() && !sub_schema.is_schema() && sub_schema.get_basic_info().repetition() == Repetition::REPEATED && sub_schema.get_fields().len() == 1 { + let element = sub_schema.get_fields().into_iter().nth(0).unwrap(); + let list_name = if sub_schema.name() == "list" { None } else { Some(sub_schema.name().to_owned()) }; + let element_name = if element.name() == "element" { None } else { Some(element.name().to_owned()) }; + value = Some(ValueSchema::List(Box::new(ListSchema(Value::parse(&*element)?.1, Some((list_name, element_name)))))); + } + // Err(ParquetError::General(String::from("List"))) + } + // TODO https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules + if value.is_none() && schema.is_group() && (schema.get_basic_info().logical_type() == LogicalType::MAP || schema.get_basic_info().logical_type() == LogicalType::MAP_KEY_VALUE) && schema.get_fields().len() == 1 { + let sub_schema = schema.get_fields().into_iter().nth(0).unwrap(); + if sub_schema.is_group() && !sub_schema.is_schema() && sub_schema.get_basic_info().repetition() == Repetition::REPEATED && sub_schema.get_fields().len() == 2 { + let mut fields = sub_schema.get_fields().into_iter(); + let (key, value_) = (fields.next().unwrap(), fields.next().unwrap()); + let key_value_name = if sub_schema.name() == "key_value" { None } else { Some(sub_schema.name().to_owned()) }; + let key_name = if key.name() == "key" { None } else { Some(key.name().to_owned()) }; + let value_name = if value_.name() == "value" { None } else { Some(value_.name().to_owned()) }; + value = Some(ValueSchema::Map(Box::new(MapSchema(Value::parse(&*key)?.1,Value::parse(&*value_)?.1, key_value_name, key_name, value_name)))); + } + } + + if value.is_none() && schema.is_group() { + let mut lookup = HashMap::new(); + value = Some(ValueSchema::Group(GroupSchema(schema.get_fields().iter().map(|schema|Value::parse(&*schema).map(|(name,schema)| {let x = lookup.insert(name, lookup.len()); assert!(x.is_none()); schema})).collect::,_>>()?, lookup))); + } + + if value.is_none() { + println!("errrrr"); + println!("{:?}", schema); + } + + let mut value = value.ok_or(ParquetError::General(String::from("Value")))?; + + Ok((String::from(""),RootSchema(schema.name().to_owned(), value, PhantomData))) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + RootReader(::reader(&schema.1, path, curr_def_level, curr_rep_level, paths)) + } +} + + +#[derive(Clone, PartialEq, Debug)] +pub struct Group(pub Vec,pub Rc>); +pub type Row = Group; + +impl Deserialize for Group { + type Schema = GroupSchema; + type Reader = GroupReader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + if schema.is_group() && !schema.is_schema() && schema.get_basic_info().repetition() == Repetition::REQUIRED { + let mut map = HashMap::new(); + let fields = schema.get_fields().iter().enumerate().map(|(i,field)| { + let (name, schema) = ::parse(&**field)?; + let x = map.insert(name, i); + assert!(x.is_none()); + Ok(schema) + }).collect::,ParquetError>>()?; + let schema_ = GroupSchema(fields, map);//$struct_schema{$($name: fields.get(stringify!($name)).ok_or(ParquetError::General(format!("Struct {} missing field {}", stringify!($struct), stringify!($name)))).and_then(|x|<$type_ as Deserialize>::parse(&**x))?.1,)*}; + return Ok((schema.name().to_owned(), schema_)) + } + Err(ParquetError::General(format!("Struct {}", stringify!($struct)))) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let mut names_ = vec![None; schema.0.len()]; + for (name,&index) in schema.1.iter() { + names_[index].replace(name.to_owned()); + } + let readers = schema.0.iter().enumerate().map(|(i,field)| { + path.push(names_[i].take().unwrap()); + let ret = Value::reader(field, path, curr_def_level, curr_rep_level, paths); + path.pop().unwrap(); + ret + }).collect(); + GroupReader{def_level: curr_def_level, readers, fields: Rc::new(schema.1.clone())} + } +} + + +impl Deserialize for Map +where + K: Deserialize + Hash + Eq, + V: Deserialize, +{ + type Schema = MapSchema; + existential type Reader: Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + if schema.is_group() && !schema.is_schema() && schema.get_basic_info().repetition() == Repetition::REQUIRED && (schema.get_basic_info().logical_type() == LogicalType::MAP || schema.get_basic_info().logical_type() == LogicalType::MAP_KEY_VALUE) && schema.get_fields().len() == 1 { + let sub_schema = schema.get_fields().into_iter().nth(0).unwrap(); + if sub_schema.is_group() && !sub_schema.is_schema() && sub_schema.get_basic_info().repetition() == Repetition::REPEATED && sub_schema.get_fields().len() == 2 { + let mut fields = sub_schema.get_fields().into_iter(); + let (key, value) = (fields.next().unwrap(), fields.next().unwrap()); + let key_value_name = if sub_schema.name() == "key_value" { None } else { Some(sub_schema.name().to_owned()) }; + let key_name = if key.name() == "key" { None } else { Some(key.name().to_owned()) }; + let value_name = if value.name() == "value" { None } else { Some(value.name().to_owned()) }; + return Ok((schema.name().to_owned(), MapSchema(K::parse(&*key)?.1,V::parse(&*value)?.1, key_value_name, key_name, value_name))); + } + } + Err(ParquetError::General(String::from("Map"))) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let key_value_name = schema.2.as_ref().map(|x|&**x).unwrap_or("key_value"); + let key_name = schema.3.as_ref().map(|x|&**x).unwrap_or("key"); + let value_name = schema.4.as_ref().map(|x|&**x).unwrap_or("value"); + + path.push(key_value_name.to_owned()); + path.push(key_name.to_owned()); + let keys_reader = K::reader(&schema.0, path, curr_def_level + 1, curr_rep_level + 1, paths); + path.pop().unwrap(); + path.push(value_name.to_owned()); + let values_reader = V::reader(&schema.1, path, curr_def_level + 1, curr_rep_level + 1, paths); + path.pop().unwrap(); + path.pop().unwrap(); + + MapReader(KeyValueReader{ + def_level: curr_def_level, + rep_level: curr_rep_level, + keys_reader, + values_reader + }, (|x:Vec<_>|Ok(Map(x.into_iter().collect()))) as fn(_)->_) + } +} + +impl Deserialize for List +where + T: Deserialize, +{ + type Schema = ListSchema; + existential type Reader: Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + // ::parse(schema).and_then(|(name, schema)| { + // match schema { + // ValueSchema::List(box ListSchema(schema, a)) => Ok((name, ListSchema(schema, a))), + // _ => Err(ParquetError::General(String::from(""))), + // } + // }) + if schema.is_group() && !schema.is_schema() && schema.get_basic_info().repetition() == Repetition::REQUIRED && schema.get_basic_info().logical_type() == LogicalType::LIST && schema.get_fields().len() == 1 { + let sub_schema = schema.get_fields().into_iter().nth(0).unwrap(); + if sub_schema.is_group() && !sub_schema.is_schema() && sub_schema.get_basic_info().repetition() == Repetition::REPEATED && sub_schema.get_fields().len() == 1 { + let element = sub_schema.get_fields().into_iter().nth(0).unwrap(); + let list_name = if sub_schema.name() == "list" { None } else { Some(sub_schema.name().to_owned()) }; + let element_name = if element.name() == "element" { None } else { Some(element.name().to_owned()) }; + return Ok((schema.name().to_owned(), ListSchema(T::parse(&*element)?.1, Some((list_name, element_name))))); + } + } + if schema.get_basic_info().repetition() == Repetition::REPEATED { + let mut schema2: Type = schema.clone(); + let basic_info = match schema2 {Type::PrimitiveType{ref mut basic_info,..} => basic_info, Type::GroupType{ref mut basic_info,..} => basic_info}; + basic_info.set_repetition(Some(Repetition::REQUIRED)); + return Ok((schema.name().to_owned(), ListSchema(T::parse(&schema2)?.1, None))); + } + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules + Err(ParquetError::General(String::from("List"))) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + MapReader(if let Some((ref list_name,ref element_name)) = schema.1 { + let list_name = list_name.as_ref().map(|x|&**x).unwrap_or("list"); + let element_name = element_name.as_ref().map(|x|&**x).unwrap_or("element"); + + path.push(list_name.to_owned()); + path.push(element_name.to_owned()); + let reader = T::reader(&schema.0, path, curr_def_level + 1, curr_rep_level + 1, paths); + path.pop().unwrap(); + path.pop().unwrap(); + + RepeatedReader{ + def_level: curr_def_level, + rep_level: curr_rep_level, + reader, + } + } else { + let reader = T::reader(&schema.0, path, curr_def_level + 1, curr_rep_level + 1, paths); + RepeatedReader{ + def_level: curr_def_level, + rep_level: curr_rep_level, + reader, + } + }, (|x|Ok(List(x))) as fn(_)->_) + } +} + +impl Deserialize for Option where T: Deserialize, ValueSchema: Downcast<::Schema> { + type Schema = OptionSchema; + type Reader = OptionReader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + ::parse(schema).and_then(|(name, schema)| { + Ok((name, OptionSchema(schema.as_option()?.0.downcast()?))) + }) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + OptionReader{def_level: curr_def_level, reader: ::reader(&schema.0, path, curr_def_level+1, curr_rep_level, paths)} + } +} + +fn downcast((name, schema): (String, ValueSchema)) -> Result<(String, T),ParquetError> where ValueSchema: Downcast { + schema.downcast().map(|schema| (name, schema)) +} + + +impl Deserialize for bool { + type Schema = BoolSchema; + type Reader = BoolReader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + BoolReader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)} + } +} +impl Deserialize for f32 { + type Schema = F32Schema; + type Reader = F32Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + F32Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)} + } +} +impl Deserialize for f64 { + type Schema = F64Schema; + type Reader = F64Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + F64Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)} + } +} +impl Deserialize for i8 { + type Schema = I8Schema; + type Reader = TryIntoReader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + TryIntoReader(I32Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)}, PhantomData) + } +} +impl Deserialize for u8 { + type Schema = U8Schema; + type Reader = TryIntoReader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + TryIntoReader(I32Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)}, PhantomData) + } +} +impl Deserialize for i16 { + type Schema = I16Schema; + type Reader = TryIntoReader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + TryIntoReader(I32Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)}, PhantomData) + } +} +impl Deserialize for u16 { + type Schema = U16Schema; + type Reader = TryIntoReader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + TryIntoReader(I32Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)}, PhantomData) + } +} +impl Deserialize for i32 { + type Schema = I32Schema; + type Reader = I32Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + I32Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)} + } +} +impl Deserialize for u32 { + type Schema = U32Schema; + existential type Reader: Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + MapReader(I32Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)}, (|x|Ok(x as u32)) as fn(_)->_) + } +} +impl Deserialize for i64 { + type Schema = I64Schema; + type Reader = I64Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + I64Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)} + } +} +impl Deserialize for u64 { + type Schema = U64Schema; + existential type Reader: Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + MapReader(I64Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)}, (|x|Ok(x as u64)) as fn(_)->_) + } +} +impl Deserialize for Timestamp { + type Schema = TimestampSchema; + existential type Reader: Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + match schema { + TimestampSchema::Int96 => { + sum::Sum3::A(MapReader(I96Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)}, (|x|Ok(Timestamp(x))) as fn(_)->_)) + } + TimestampSchema::Millis => { + sum::Sum3::B(MapReader(I64Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)}, (|millis| { + let day: i64 = ((JULIAN_DAY_OF_EPOCH * SECONDS_PER_DAY * MILLIS_PER_SECOND) + millis) / (SECONDS_PER_DAY * MILLIS_PER_SECOND); + let nanoseconds: i64 = (millis - ((day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY * MILLIS_PER_SECOND)) * 1_000_000; + + Ok(Timestamp(Int96::new((nanoseconds & 0xffff).try_into().unwrap(),((nanoseconds as u64) >> 32).try_into().unwrap(),day.try_into().map_err(|err:TryFromIntError|ParquetError::General(err.description().to_owned()))?))) + }) as fn(_)->_)) + } + TimestampSchema::Micros => { + sum::Sum3::C(MapReader(I64Reader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)}, (|micros| { + let day: i64 = ((JULIAN_DAY_OF_EPOCH * SECONDS_PER_DAY * MILLIS_PER_SECOND * MICROS_PER_MILLI) + micros) / (SECONDS_PER_DAY * MILLIS_PER_SECOND * MICROS_PER_MILLI); + let nanoseconds: i64 = (micros - ((day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY * MILLIS_PER_SECOND * MICROS_PER_MILLI)) * 1_000; + + Ok(Timestamp(Int96::new((nanoseconds & 0xffff).try_into().unwrap(),((nanoseconds as u64) >> 32).try_into().unwrap(),day.try_into().map_err(|err:TryFromIntError|ParquetError::General(err.description().to_owned()))?))) + }) as fn(_)->_)) + } + } + } +} +// impl Deserialize for parquet::data_type::Decimal { +// type Schema = DecimalSchema; +// type Reader = Reader; +// fn placeholder() -> Self::Schema { +// +// } +// fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { +// unimplemented!() +// } +// fn render(name: &str, schema: &Self::Schema) -> Type { +// Type::primitive_type_builder(name, PhysicalType::DOUBLE) +// .with_repetition(Repetition::REQUIRED) +// .with_logical_type(LogicalType::NONE) +// .with_length(-1) +// .with_precision(-1) +// .with_scale(-1) +// .build().unwrap() +// Type::PrimitiveType { +// basic_info: BasicTypeInfo { +// name: String::from(schema), +// repetition: Some(Repetition::REQUIRED), +// logical_type: LogicalType::DECIMAL, +// id: None, +// } +// physical_type: PhysicalType:: +// } +// } +// struct DecimalSchema { +// scale: u32, +// precision: u32, +// } +impl Deserialize for Vec { + type Schema = VecSchema; + type Reader = ByteArrayReader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + ByteArrayReader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)} + } +} +impl Deserialize for String { + type Schema = StringSchema; + existential type Reader: Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + Value::parse(schema).and_then(downcast) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + MapReader(ByteArrayReader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)}, (|x|String::from_utf8(x).map_err(|err:FromUtf8Error|ParquetError::General(err.to_string()))) as fn(_)->_) + } +} +impl Deserialize for [u8; 1024] { + type Schema = ArraySchema; + existential type Reader: Reader; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + if schema.is_primitive() && schema.get_basic_info().repetition() == Repetition::REQUIRED && schema.get_physical_type() == PhysicalType::FIXED_LEN_BYTE_ARRAY && schema.get_basic_info().logical_type() == LogicalType::NONE && schema.get_type_length() == 1024 { + return Ok((schema.name().to_owned(), ArraySchema(PhantomData))) + } + Err(ParquetError::General(String::from(""))) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + let col_path = ColumnPath::new(path.to_vec()); + let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); + assert_eq!((curr_def_level, curr_rep_level), (col_descr.max_def_level(), col_descr.max_rep_level())); + MapReader(FixedLenByteArrayReader{column: TypedTripletIter::::new(curr_def_level, curr_rep_level, DEFAULT_BATCH_SIZE, col_reader)}, (|bytes: Vec<_>| { + let mut ret = std::mem::MaybeUninit::::uninitialized(); + assert_eq!(bytes.len(), unsafe { ret.get_ref().len() }); + unsafe { + std::ptr::copy_nonoverlapping( + bytes.as_ptr(), + ret.get_mut().as_mut_ptr(), + bytes.len(), + ) + }; + Ok(unsafe { ret.into_inner() }) + }) as fn(_)->_) + } +} + +macro_rules! impl_parquet_deserialize_tuple { + ($($t:ident $i:tt)*) => ( + impl<$($t,)*> Reader for TupleReader<($($t,)*)> where $($t: Reader,)* { + type Item = ($($t::Item,)*); + + fn read_field(&mut self) -> Result { + Ok(( + $((self.0).$i.read_field()?,)* + )) + } + fn advance_columns(&mut self) { + $((self.0).$i.advance_columns();)* + } + fn has_next(&self) -> bool { + // self.$first_name.has_next() + $((self.0).$i.has_next() &&)* true + } + fn current_def_level(&self) -> i16 { + $(if true { (self.0).$i.current_def_level() } else)* + { + panic!("Current definition level: empty group reader") + } + } + fn current_rep_level(&self) -> i16 { + $(if true { (self.0).$i.current_rep_level() } else)* + { + panic!("Current repetition level: empty group reader") + } + } + } + impl<$($t,)*> str::FromStr for RootSchema<($($t,)*),TupleSchema<($((String,$t::Schema,),)*)>> where $($t: Deserialize,)* { + type Err = ParquetError; + + fn from_str(s: &str) -> Result { + parse_message_type(s).and_then(|x| as Deserialize>::parse(&x).map_err(|err| { + // let x: Type = as Deserialize>::render("", & as Deserialize>::placeholder()); + let mut a = Vec::new(); + // print_schema(&mut a, &x); + ParquetError::General(format!( + "Types don't match schema.\nSchema is:\n{}\nBut types require:\n{}\nError: {}", + s, + String::from_utf8(a).unwrap(), + err + )) + })).map(|x|x.1) + } + } + impl<$($t,)*> Debug for TupleSchema<($((String,$t,),)*)> where $($t: Debug,)* { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_tuple("TupleSchema") + $(.field(&(self.0).$i))* + .finish() + } + } + impl<$($t,)*> DebugType for TupleSchema<($((String,$t,),)*)> where $($t: DebugType,)* { + fn fmt(f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.write_str("TupleSchema") + } + } + impl<$($t,)*> Deserialize for Root<($($t,)*)> where $($t: Deserialize,)* { + type Schema = RootSchema<($($t,)*),TupleSchema<($((String,$t::Schema,),)*)>>; + type Reader = RootReader>; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + if schema.is_schema() { + let mut fields = schema.get_fields().iter(); + let schema_ = RootSchema(schema.name().to_owned(), TupleSchema(($(fields.next().ok_or(ParquetError::General(String::from("Group missing field"))).and_then(|x|$t::parse(&**x))?,)*)), PhantomData); + if fields.next().is_none() { + return Ok((String::from(""), schema_)) + } + } + Err(ParquetError::General(String::from(""))) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + RootReader(<($($t,)*) as Deserialize>::reader(&schema.1, path, curr_def_level, curr_rep_level, paths)) + } + } + impl<$($t,)*> Deserialize for ($($t,)*) where $($t: Deserialize,)* { + type Schema = TupleSchema<($((String,$t::Schema,),)*)>; + type Reader = TupleReader<($($t::Reader,)*)>; + + fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + if schema.is_group() && !schema.is_schema() && schema.get_basic_info().repetition() == Repetition::REQUIRED { + let mut fields = schema.get_fields().iter(); + let schema_ = TupleSchema(($(fields.next().ok_or(ParquetError::General(String::from("Group missing field"))).and_then(|x|$t::parse(&**x))?,)*)); + if fields.next().is_none() { + return Ok((schema.name().to_owned(), schema_)) + } + } + Err(ParquetError::General(String::from(""))) + } + fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + $( + path.push((schema.0).$i.0.to_owned()); + let $t = <$t as Deserialize>::reader(&(schema.0).$i.1, path, curr_def_level, curr_rep_level, paths); + path.pop().unwrap(); + )*; + TupleReader(($($t,)*)) + } + } + // impl<$($t,)*> Deserialize for Option<($($t,)*)> where $($t: Deserialize,)* { + // type Schema = OptionSchema>; + // type Reader = OptionReader>; + + // fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { + // if schema.is_group() && !schema.is_schema() && schema.get_basic_info().repetition() == Repetition::OPTIONAL { + // let mut fields = schema.get_fields().iter(); + // let schema_ = OptionSchema(TupleSchema(($(fields.next().ok_or(ParquetError::General(String::from("Group missing field"))).and_then(|x|$t::parse(&**x))?,)*))); + // if fields.next().is_none() { + // return Ok((schema.name().to_owned(), schema_)) + // } + // } + // Err(ParquetError::General(String::from(""))) + // } + // fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap) -> Self::Reader { + // OptionReader{def_level: curr_def_level, reader: <($($t,)*) as Deserialize>::reader(&schema.0, path, curr_def_level+1, curr_rep_level, paths)} + // } + // } + impl<$($t,)*> Downcast<($($t,)*)> for Value where Value: $(Downcast<$t> +)* { + fn downcast(self) -> Result<($($t,)*),ParquetError> { + let mut fields = self.as_group()?.0.into_iter(); + Ok(($({$i;fields.next().unwrap().downcast()?},)*)) + } + } + impl<$($t,)*> Downcast> for ValueSchema where ValueSchema: $(Downcast<$t> +)* { + fn downcast(self) -> Result,ParquetError> { + let group = self.as_group()?; + let mut fields = group.0.into_iter(); + let mut names = vec![None; group.1.len()]; + for (name,&index) in group.1.iter() { + names[index].replace(name.to_owned()); + } + let mut names = names.into_iter().map(Option::unwrap); + Ok(TupleSchema(($({$i;(names.next().unwrap(),fields.next().unwrap().downcast()?)},)*))) + } + } + ); +} + +impl_parquet_deserialize_tuple!(); +impl_parquet_deserialize_tuple!(A 0); +impl_parquet_deserialize_tuple!(A 0 B 1); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21 W 22); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21 W 22 X 23); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21 W 22 X 23 Y 24); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21 W 22 X 23 Y 24 Z 25); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21 W 22 X 23 Y 24 Z 25 AA 26); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21 W 22 X 23 Y 24 Z 25 AA 26 AB 27); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21 W 22 X 23 Y 24 Z 25 AA 26 AB 27 AC 28); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21 W 22 X 23 Y 24 Z 25 AA 26 AB 27 AC 28 AD 29); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21 W 22 X 23 Y 24 Z 25 AA 26 AB 27 AC 28 AD 29 AE 30); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21 W 22 X 23 Y 24 Z 25 AA 26 AB 27 AC 28 AD 29 AE 30 AF 31); +impl_parquet_deserialize_tuple!(A 0 B 1 C 2 D 3 E 4 F 5 G 6 H 7 I 8 J 9 K 10 L 11 M 12 N 13 O 14 P 15 Q 16 R 17 S 18 T 19 U 20 V 21 W 22 X 23 Y 24 Z 25 AA 26 AB 27 AC 28 AD 29 AE 30 AF 31 AG 32); +