diff --git a/.dockerignore b/.dockerignore index 083905c7439..eb71138c679 100644 --- a/.dockerignore +++ b/.dockerignore @@ -55,6 +55,8 @@ !rust/arrow-flight/Cargo.toml !rust/parquet/Cargo.toml !rust/parquet/build.rs +!rust/parquet_derive/Cargo.toml +!rust/parquet_derive_test/Cargo.toml !rust/datafusion/Cargo.toml !rust/datafusion/benches !rust/integration-testing/Cargo.toml diff --git a/ci/docker/debian-10-rust.dockerfile b/ci/docker/debian-10-rust.dockerfile index b23b03c9a1c..9c9c9b51048 100644 --- a/ci/docker/debian-10-rust.dockerfile +++ b/ci/docker/debian-10-rust.dockerfile @@ -58,14 +58,18 @@ RUN mkdir \ /arrow/rust/benchmarks/src \ /arrow/rust/datafusion/src \ /arrow/rust/integration-testing/src \ - /arrow/rust/parquet/src && \ + /arrow/rust/parquet/src \ + /arrow/rust/parquet_derive/src \ + /arrow/rust/parquet_derive_test/src && \ touch \ /arrow/rust/arrow-flight/src/lib.rs \ /arrow/rust/arrow/src/lib.rs \ /arrow/rust/benchmarks/src/lib.rs \ /arrow/rust/datafusion/src/lib.rs \ /arrow/rust/integration-testing/src/lib.rs \ - /arrow/rust/parquet/src/lib.rs + /arrow/rust/parquet/src/lib.rs \ + /arrow/rust/parquet_derive/src/lib.rs \ + /arrow/rust/parquet_derive_test/src/lib.rs # Compile dependencies for the whole workspace RUN cd /arrow/rust && cargo build --workspace --lib --all-features diff --git a/dev/release/00-prepare-test.rb b/dev/release/00-prepare-test.rb index e6841aa85a8..eb5859c8424 100644 --- a/dev/release/00-prepare-test.rb +++ b/dev/release/00-prepare-test.rb @@ -330,6 +330,35 @@ def test_version_pre_tag "+See [crate documentation](https://docs.rs/crate/parquet/#{@release_version}) on available API."], ], }, + { + path: "rust/parquet_derive/Cargo.toml", + hunks: [ + ["-version = \"#{@snapshot_version}\"", + "+version = \"#{@release_version}\""], + ["-parquet = { path = \"../parquet\", version = \"#{@snapshot_version}\" }", + "+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }"], + ], + }, + { + path: "rust/parquet_derive/README.md", + hunks: [ + ["-parquet = \"#{@snapshot_version}\"", + "-parquet_derive = \"#{@snapshot_version}\"", + "+parquet = \"#{@release_version}\"", + "+parquet_derive = \"#{@release_version}\""], + ], + }, + { + path: "rust/parquet_derive_test/Cargo.toml", + hunks: [ + ["-version = \"#{@snapshot_version}\"", + "+version = \"#{@release_version}\"", + "-parquet = { path = \"../parquet\", version = \"#{@snapshot_version}\" }", + "-parquet_derive = { path = \"../parquet_derive\", version = \"#{@snapshot_version}\" }", + "+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }", + "+parquet_derive = { path = \"../parquet_derive\", version = \"#{@release_version}\" }"], + ], + }, ], parse_patch(git("log", "-n", "1", "-p"))) end @@ -537,6 +566,35 @@ def test_version_post_tag "+See [crate documentation](https://docs.rs/crate/parquet/#{@next_snapshot_version}) on available API."], ], }, + { + path: "rust/parquet_derive/Cargo.toml", + hunks: [ + ["-version = \"#{@release_version}\"", + "+version = \"#{@next_snapshot_version}\""], + ["-parquet = { path = \"../parquet\", version = \"#{@release_version}\" }", + "+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }"], + ], + }, + { + path: "rust/parquet_derive/README.md", + hunks: [ + ["-parquet = \"#{@release_version}\"", + "-parquet_derive = \"#{@release_version}\"", + "+parquet = \"#{@next_snapshot_version}\"", + "+parquet_derive = \"#{@next_snapshot_version}\""], + ], + }, + { + path: "rust/parquet_derive_test/Cargo.toml", + hunks: [ + ["-version = \"#{@release_version}\"", + "+version = \"#{@next_snapshot_version}\"", + "-parquet = { path = \"../parquet\", version = \"#{@release_version}\" }", + "-parquet_derive = { path = \"../parquet_derive\", version = \"#{@release_version}\" }", + "+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }", + "+parquet_derive = { path = \"../parquet_derive\", version = \"#{@next_snapshot_version}\" }"], + ], + }, ], parse_patch(git("log", "-n", "1", "-p"))) end diff --git a/dev/release/00-prepare.sh b/dev/release/00-prepare.sh index 1678c0228da..df5283e01fa 100755 --- a/dev/release/00-prepare.sh +++ b/dev/release/00-prepare.sh @@ -151,29 +151,17 @@ update_versions() { -e "s/^(arrow = .* version = )\".*\"(( .*)|(, features = .*))$/\\1\"${version}\"\\2/g" \ -e "s/^(arrow-flight = .* version = )\".+\"( .*)/\\1\"${version}\"\\2/g" \ -e "s/^(parquet = .* version = )\".*\"(( .*)|(, features = .*))$/\\1\"${version}\"\\2/g" \ + -e "s/^(parquet_derive = .* version = )\".*\"(( .*)|(, features = .*))$/\\1\"${version}\"\\2/g" \ */Cargo.toml rm -f */Cargo.toml.bak git add */Cargo.toml - # Update version number for parquet README - sed -i.bak -E -e \ - "s/^parquet = \".+\"/parquet = \"${version}\"/g" \ - parquet/README.md - sed -i.bak -E -e \ - "s/docs.rs\/crate\/parquet\/.+\)/docs.rs\/crate\/parquet\/${version}\)/g" \ - parquet/README.md - rm -f parquet/README.md.bak - git add parquet/README.md - - # Update version number for datafusion README - sed -i.bak -E -e \ - "s/^datafusion = \".+\"/datafusion = \"${version}\"/g" \ - datafusion/README.md - sed -i.bak -E -e \ - "s/docs.rs\/crate\/datafusion\/.+\)/docs.rs\/crate\/datafusion\/${version}\)/g" \ - datafusion/README.md - rm -f datafusion/README.md.bak - git add datafusion/README.md + sed -i.bak -E \ + -e "s/^([^ ]+) = \".+\"/\\1 = \"${version}\"/g" \ + -e "s,docs\.rs/crate/([^/]+)/[^)]+,docs.rs/crate/\\1/${version},g" \ + */README.md + rm -f */README.md.bak + git add */README.md cd - } diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 0cb529fb690..459fe8fd4ed 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -19,6 +19,8 @@ members = [ "arrow", "parquet", + "parquet_derive", + "parquet_derive_test", "datafusion", "arrow-flight", "integration-testing", diff --git a/rust/parquet/src/record/mod.rs b/rust/parquet/src/record/mod.rs index 4427ada72af..ab615148460 100644 --- a/rust/parquet/src/record/mod.rs +++ b/rust/parquet/src/record/mod.rs @@ -19,8 +19,10 @@ mod api; pub mod reader; +mod record_writer; mod triplet; -pub use self::api::{ - List, ListAccessor, Map, MapAccessor, Row, RowAccessor, RowFormatter, +pub use self::{ + api::{List, ListAccessor, Map, MapAccessor, Row, RowAccessor}, + record_writer::RecordWriter, }; diff --git a/rust/parquet/src/record/record_writer.rs b/rust/parquet/src/record/record_writer.rs new file mode 100644 index 00000000000..00ce9fd4e47 --- /dev/null +++ b/rust/parquet/src/record/record_writer.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::super::errors::ParquetError; +use super::super::file::writer::RowGroupWriter; + +pub trait RecordWriter { + fn write_to_row_group( + &self, + row_group_writer: &mut Box, + ) -> Result<(), ParquetError>; +} diff --git a/rust/parquet_derive/Cargo.toml b/rust/parquet_derive/Cargo.toml new file mode 100644 index 00000000000..b4debaf410b --- /dev/null +++ b/rust/parquet_derive/Cargo.toml @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "parquet_derive" +version = "2.0.0-SNAPSHOT" +authors = ["Apache Arrow "] +keywords = [ "parquet" ] +edition = "2018" + +[lib] +proc-macro = true + +[features] +chrono = [] +bigdecimal = [] +uuid = [] + +[dependencies] +proc-macro2 = "1.0.8" +quote = "1.0.2" +syn = { version = "1.0.14", features = ["full", "extra-traits"] } +parquet = { path = "../parquet", version = "2.0.0-SNAPSHOT" } diff --git a/rust/parquet_derive/README.md b/rust/parquet_derive/README.md new file mode 100644 index 00000000000..6d3f6696ef7 --- /dev/null +++ b/rust/parquet_derive/README.md @@ -0,0 +1,98 @@ + + +# Parquet Derive + +A crate for deriving `RecordWriter` for arbitrary, _simple_ structs. This does not generate writers for arbitrarily nested +structures. It only works for primitives and a few generic structures and +various levels of reference. Please see features checklist for what is currently +supported. + +Derive also has some support for the chrono time library. You must must enable the `chrono` feature to get this support. + +## Usage +Add this to your Cargo.toml: +```toml +[dependencies] +parquet = "2.0.0-SNAPSHOT" +parquet_derive = "2.0.0-SNAPSHOT" +``` + +and this to your crate root: +```rust +extern crate parquet; +#[macro_use] extern crate parquet_derive; +``` + +Example usage of deriving a `RecordWriter` for your struct: + +```rust +use parquet; +use parquet::record::RecordWriter; + +#[derive(ParquetRecordWriter)] +struct ACompleteRecord<'a> { + pub a_bool: bool, + pub a_str: &'a str, + pub a_string: String, + pub a_borrowed_string: &'a String, + pub maybe_a_str: Option<&'a str>, + pub magic_number: i32, + pub low_quality_pi: f32, + pub high_quality_pi: f64, + pub maybe_pi: Option, + pub maybe_best_pi: Option, +} + +// Initialize your parquet file +let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); +let mut row_group = writer.next_row_group().unwrap(); + +// Build up your records +let chunks = vec![ACompleteRecord{...}]; + +// The derived `RecordWriter` takes over here +(&chunks[..]).write_to_row_group(&mut row_group); + +writer.close_row_group(row_group).unwrap(); +writer.close().unwrap(); +``` + +## Features +- [X] Support writing `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec` +- [ ] Support writing dictionaries +- [X] Support writing logical types like timestamp +- [X] Derive definition_levels for `Option` +- [ ] Derive definition levels for nested structures +- [ ] Derive writing tuple struct +- [ ] Derive writing `tuple` container types + +## Requirements +- Same as `parquet-rs` + +## Test +Testing a `*_derive` crate requires an intermediate crate. Go to `parquet_derive_test` and run `cargo test` for +unit tests. + +## Docs +To build documentation, run `cargo doc --no-deps`. +To compile and view in the browser, run `cargo doc --no-deps --open`. + +## License +Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0. diff --git a/rust/parquet_derive/src/lib.rs b/rust/parquet_derive/src/lib.rs new file mode 100644 index 00000000000..35a538f932c --- /dev/null +++ b/rust/parquet_derive/src/lib.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![recursion_limit = "128"] + +extern crate proc_macro; +extern crate proc_macro2; +extern crate syn; +#[macro_use] +extern crate quote; + +extern crate parquet; + +use syn::{parse_macro_input, Data, DataStruct, DeriveInput}; + +mod parquet_field; + +/// Derive flat, simple RecordWriter implementations. Works by parsing +/// a struct tagged with `#[derive(ParquetRecordWriter)]` and emitting +/// the correct writing code for each field of the struct. Column writers +/// are generated in the order they are defined. +/// +/// It is up to the programmer to keep the order of the struct +/// fields lined up with the schema. +/// +/// Example: +/// +/// ```ignore +/// use parquet; +/// use parquet::record::RecordWriter; +/// use parquet::schema::parser::parse_message_type; +/// +/// use std::rc::Rc; +// +/// #[derive(ParquetRecordWriter)] +/// struct ACompleteRecord<'a> { +/// pub a_bool: bool, +/// pub a_str: &'a str, +/// } +/// +/// let schema_str = "message schema { +/// REQUIRED boolean a_bool; +/// REQUIRED BINARY a_str (UTF8); +/// }"; +/// +/// pub fn write_some_records() { +/// let samples = vec![ +/// ACompleteRecord { +/// a_bool: true, +/// a_str: "I'm true" +/// }, +/// ACompleteRecord { +/// a_bool: false, +/// a_str: "I'm false" +/// } +/// ]; +/// +/// let schema = Rc::new(parse_message_type(schema_str).unwrap()); +/// +/// let props = Rc::new(WriterProperties::builder().build()); +/// let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); +/// +/// let mut row_group = writer.next_row_group().unwrap(); +/// samples.as_slice().write_to_row_group(&mut row_group).unwrap(); +/// writer.close_row_group(row_group).unwrap(); +/// writer.close().unwrap(); +/// } +/// ``` +/// +#[proc_macro_derive(ParquetRecordWriter)] +pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let input: DeriveInput = parse_macro_input!(input as DeriveInput); + let fields = match input.data { + Data::Struct(DataStruct { fields, .. }) => fields, + Data::Enum(_) => unimplemented!("Enum currently is not supported"), + Data::Union(_) => unimplemented!("Union currently is not supported"), + }; + + let field_infos: Vec<_> = fields + .iter() + .map(|f: &syn::Field| parquet_field::Field::from(f)) + .collect(); + + let writer_snippets: Vec = + field_infos.iter().map(|x| x.writer_snippet()).collect(); + + let derived_for = input.ident; + let generics = input.generics; + + (quote! { + impl#generics RecordWriter<#derived_for#generics> for &[#derived_for#generics] { + fn write_to_row_group(&self, row_group_writer: &mut Box) -> Result<(), parquet::errors::ParquetError> { + let mut row_group_writer = row_group_writer; + let records = &self; // Used by all the writer snippets to be more clear + + #( + { + let mut some_column_writer = row_group_writer.next_column().unwrap(); + if let Some(mut column_writer) = some_column_writer { + #writer_snippets + row_group_writer.close_column(column_writer)?; + } else { + return Err(parquet::errors::ParquetError::General("Failed to get next column".into())) + } + } + );* + + Ok(()) + } + } + }).into() +} diff --git a/rust/parquet_derive/src/parquet_field.rs b/rust/parquet_derive/src/parquet_field.rs new file mode 100644 index 00000000000..6b74743ce34 --- /dev/null +++ b/rust/parquet_derive/src/parquet_field.rs @@ -0,0 +1,931 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[derive(Debug, PartialEq)] +pub struct Field { + ident: syn::Ident, + ty: Type, + is_a_byte_buf: bool, + third_party_type: Option, +} + +/// Use third party libraries, detected +/// at compile time. These libraries will +/// be written to parquet as their preferred +/// physical type. +/// +/// ChronoNaiveDateTime is written as i64 +/// ChronoNaiveDate is written as i32 +#[derive(Debug, PartialEq)] +enum ThirdPartyType { + ChronoNaiveDateTime, + ChronoNaiveDate, + Uuid, +} + +impl Field { + pub fn from(f: &syn::Field) -> Self { + let ty = Type::from(f); + let is_a_byte_buf = ty.physical_type() == parquet::basic::Type::BYTE_ARRAY; + + let third_party_type = match &ty.last_part()[..] { + "NaiveDateTime" => Some(ThirdPartyType::ChronoNaiveDateTime), + "NaiveDate" => Some(ThirdPartyType::ChronoNaiveDate), + "Uuid" => Some(ThirdPartyType::Uuid), + _ => None, + }; + + Field { + ident: f + .ident + .clone() + .expect("Only structs with named fields are currently supported"), + ty, + is_a_byte_buf, + third_party_type, + } + } + + /// Takes the parsed field of the struct and emits a valid + /// column writer snippet. Should match exactly what you + /// would write by hand. + /// + /// Can only generate writers for basic structs, for example: + /// + /// struct Record { + /// a_bool: bool, + /// maybe_a_bool: Option + /// } + /// + /// but not + /// + /// struct UnsupportedNestedRecord { + /// a_property: bool, + /// nested_record: Record + /// } + /// + /// because this parsing logic is not sophisticated enough for definition + /// levels beyond 2. + pub fn writer_snippet(&self) -> proc_macro2::TokenStream { + let ident = &self.ident; + let column_writer = self.ty.column_writer(); + + let vals_builder = match &self.ty { + Type::TypePath(_) => self.copied_direct_vals(), + Type::Option(ref first_type) => match **first_type { + Type::TypePath(_) => self.option_into_vals(), + Type::Reference(_, ref second_type) => match **second_type { + Type::TypePath(_) => self.option_into_vals(), + _ => unimplemented!("Unsupported type encountered"), + }, + ref f @ _ => unimplemented!("Unsupported: {:#?}", f), + }, + Type::Reference(_, ref first_type) => match **first_type { + Type::TypePath(_) => self.copied_direct_vals(), + Type::Option(ref second_type) => match **second_type { + Type::TypePath(_) => self.option_into_vals(), + Type::Reference(_, ref second_type) => match **second_type { + Type::TypePath(_) => self.option_into_vals(), + _ => unimplemented!("Unsupported type encountered"), + }, + ref f @ _ => unimplemented!("Unsupported: {:#?}", f), + }, + ref f @ _ => unimplemented!("Unsupported: {:#?}", f), + }, + f @ _ => unimplemented!("Unsupported: {:#?}", f), + }; + + let definition_levels = match &self.ty { + Type::TypePath(_) => None, + Type::Option(ref first_type) => match **first_type { + Type::TypePath(_) => Some(self.optional_definition_levels()), + Type::Option(_) => unimplemented!("Unsupported nesting encountered"), + Type::Reference(_, ref second_type) + | Type::Vec(ref second_type) + | Type::Array(ref second_type) => match **second_type { + Type::TypePath(_) => Some(self.optional_definition_levels()), + _ => unimplemented!("Unsupported nesting encountered"), + }, + }, + Type::Reference(_, ref first_type) + | Type::Vec(ref first_type) + | Type::Array(ref first_type) => match **first_type { + Type::TypePath(_) => None, + Type::Reference(_, ref second_type) + | Type::Vec(ref second_type) + | Type::Array(ref second_type) + | Type::Option(ref second_type) => match **second_type { + Type::TypePath(_) => Some(self.optional_definition_levels()), + Type::Reference(_, ref third_type) => match **third_type { + Type::TypePath(_) => Some(self.optional_definition_levels()), + _ => unimplemented!("Unsupported definition encountered"), + }, + _ => unimplemented!("Unsupported definition encountered"), + }, + }, + }; + + // "vals" is the run of primitive data being written for the column + // "definition_levels" is a vector of bools which controls whether a value is missing or present + // this TokenStream is only one part of the code for writing a column and + // it relies on values calculated in prior code snippets, namely "definition_levels" and "vals_builder". + // All the context is put together in this functions final quote and + // this expression just switches between non-nullable and nullable write statements + let write_batch_expr = if definition_levels.is_some() { + quote! { + if let #column_writer(ref mut typed) = column_writer { + typed.write_batch(&vals[..], Some(&definition_levels[..]), None)?; + } else { + panic!("Schema and struct disagree on type for {}", stringify!{#ident}) + } + } + } else { + quote! { + if let #column_writer(ref mut typed) = column_writer { + typed.write_batch(&vals[..], None, None)?; + } else { + panic!("Schema and struct disagree on type for {}", stringify!{#ident}) + } + } + }; + + quote! { + { + #definition_levels + + #vals_builder + + #write_batch_expr + } + } + } + + fn option_into_vals(&self) -> proc_macro2::TokenStream { + let field_name = &self.ident; + let is_a_byte_buf = self.is_a_byte_buf; + let is_a_timestamp = + self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime); + let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate); + let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid); + let copy_to_vec = match self.ty.physical_type() { + parquet::basic::Type::BYTE_ARRAY + | parquet::basic::Type::FIXED_LEN_BYTE_ARRAY => false, + _ => true, + }; + + let binding = if copy_to_vec { + quote! { let Some(inner) = rec.#field_name } + } else { + quote! { let Some(ref inner) = rec.#field_name } + }; + + let some = if is_a_timestamp { + quote! { Some(inner.timestamp_millis()) } + } else if is_a_date { + quote! { Some(inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) } + } else if is_a_uuid { + quote! { Some((&inner.to_string()[..]).into()) } + } else if is_a_byte_buf { + quote! { Some((&inner[..]).into())} + } else { + quote! { Some(inner) } + }; + + quote! { + let vals: Vec<_> = records.iter().filter_map(|rec| { + if #binding { + #some + } else { + None + } + }).collect(); + } + } + + fn copied_direct_vals(&self) -> proc_macro2::TokenStream { + let field_name = &self.ident; + let is_a_byte_buf = self.is_a_byte_buf; + let is_a_timestamp = + self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime); + let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate); + let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid); + + let access = if is_a_timestamp { + quote! { rec.#field_name.timestamp_millis() } + } else if is_a_date { + quote! { rec.#field_name.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 } + } else if is_a_uuid { + quote! { (&rec.#field_name.to_string()[..]).into() } + } else if is_a_byte_buf { + quote! { (&rec.#field_name[..]).into() } + } else { + quote! { rec.#field_name } + }; + + quote! { + let vals: Vec<_> = records.iter().map(|rec| #access).collect(); + } + } + + fn optional_definition_levels(&self) -> proc_macro2::TokenStream { + let field_name = &self.ident; + + quote! { + let definition_levels: Vec = self + .iter() + .map(|rec| if rec.#field_name.is_some() { 1 } else { 0 }) + .collect(); + } + } +} + +#[derive(Debug, PartialEq)] +enum Type { + Array(Box), + Option(Box), + Vec(Box), + TypePath(syn::Type), + Reference(Option, Box), +} + +impl Type { + /// Takes a rust type and returns the appropriate + /// parquet-rs column writer + fn column_writer(&self) -> syn::TypePath { + use parquet::basic::Type as BasicType; + + match self.physical_type() { + BasicType::BOOLEAN => { + syn::parse_quote!(parquet::column::writer::ColumnWriter::BoolColumnWriter) + } + BasicType::INT32 => syn::parse_quote!( + parquet::column::writer::ColumnWriter::Int32ColumnWriter + ), + BasicType::INT64 => syn::parse_quote!( + parquet::column::writer::ColumnWriter::Int64ColumnWriter + ), + BasicType::INT96 => syn::parse_quote!( + parquet::column::writer::ColumnWriter::Int96ColumnWriter + ), + BasicType::FLOAT => syn::parse_quote!( + parquet::column::writer::ColumnWriter::FloatColumnWriter + ), + BasicType::DOUBLE => syn::parse_quote!( + parquet::column::writer::ColumnWriter::DoubleColumnWriter + ), + BasicType::BYTE_ARRAY => syn::parse_quote!( + parquet::column::writer::ColumnWriter::ByteArrayColumnWriter + ), + BasicType::FIXED_LEN_BYTE_ARRAY => syn::parse_quote!( + parquet::column::writer::ColumnWriter::FixedLenByteArrayColumnWriter + ), + } + } + + /// Helper to simplify a nested field definition to its leaf type + /// + /// Ex: + /// Option<&String> => Type::TypePath(String) + /// &Option => Type::TypePath(i32) + /// Vec> => Type::Vec(u8) + /// + /// Useful in determining the physical type of a field and the + /// definition levels. + fn leaf_type_recursive(&self) -> &Type { + self.leaf_type_recursive_helper(self, None) + } + + fn leaf_type_recursive_helper<'a>( + &'a self, + ty: &'a Type, + parent_ty: Option<&'a Type>, + ) -> &Type { + match ty { + Type::TypePath(_) => parent_ty.unwrap_or(ty), + Type::Option(ref first_type) + | Type::Vec(ref first_type) + | Type::Array(ref first_type) + | Type::Reference(_, ref first_type) => { + self.leaf_type_recursive_helper(first_type, Some(ty)) + } + } + } + + /// Helper method to further unwrap leaf_type() to get inner-most + /// type information, useful for determining the physical type + /// and normalizing the type paths. + fn inner_type(&self) -> &syn::Type { + let leaf_type = self.leaf_type_recursive(); + + match leaf_type { + Type::TypePath(ref type_) => type_, + Type::Option(ref first_type) + | Type::Vec(ref first_type) + | Type::Array(ref first_type) + | Type::Reference(_, ref first_type) => match **first_type { + Type::TypePath(ref type_) => type_, + _ => unimplemented!("leaf_type() should only return shallow types"), + }, + } + } + + /// Helper to normalize a type path by extracting the + /// most identifiable part + /// + /// Ex: + /// std::string::String => String + /// Vec => Vec + /// chrono::NaiveDateTime => NaiveDateTime + /// + /// Does run the risk of mis-identifying a type if import + /// rename is in play. Please note procedural macros always + /// run before type resolution so this is a risk the user + /// takes on when renaming imports. + fn last_part(&self) -> String { + let inner_type = self.inner_type(); + let inner_type_str = (quote! { #inner_type }).to_string(); + + inner_type_str + .split("::") + .last() + .unwrap() + .trim() + .to_string() + } + + /// Converts rust types to parquet physical types. + /// + /// Ex: + /// [u8; 10] => FIXED_LEN_BYTE_ARRAY + /// Vec => BYTE_ARRAY + /// String => BYTE_ARRAY + /// i32 => INT32 + fn physical_type(&self) -> parquet::basic::Type { + use parquet::basic::Type as BasicType; + + let last_part = self.last_part(); + let leaf_type = self.leaf_type_recursive(); + + match leaf_type { + Type::Array(ref first_type) => { + if let Type::TypePath(_) = **first_type { + if last_part == "u8" { + return BasicType::FIXED_LEN_BYTE_ARRAY; + } + } + } + Type::Vec(ref first_type) => { + if let Type::TypePath(_) = **first_type { + if last_part == "u8" { + return BasicType::BYTE_ARRAY; + } + } + } + _ => (), + } + + match last_part.trim() { + "bool" => BasicType::BOOLEAN, + "u8" | "u16" | "u32" => BasicType::INT32, + "i8" | "i16" | "i32" | "NaiveDate" => BasicType::INT32, + "u64" | "i64" | "usize" | "NaiveDateTime" => BasicType::INT64, + "f32" => BasicType::FLOAT, + "f64" => BasicType::DOUBLE, + "String" | "str" | "Uuid" => BasicType::BYTE_ARRAY, + f @ _ => unimplemented!("{} currently is not supported", f), + } + } + + /// Convert a parsed rust field AST in to a more easy to manipulate + /// parquet_derive::Field + fn from(f: &syn::Field) -> Self { + Type::from_type(f, &f.ty) + } + + fn from_type(f: &syn::Field, ty: &syn::Type) -> Self { + match ty { + syn::Type::Path(ref p) => Type::from_type_path(f, p), + syn::Type::Reference(ref tr) => Type::from_type_reference(f, tr), + syn::Type::Array(ref ta) => Type::from_type_array(f, ta), + other @ _ => unimplemented!( + "Unable to derive {:?} - it is currently an unsupported type\n{:#?}", + f.ident.as_ref().unwrap(), + other + ), + } + } + + fn from_type_path(f: &syn::Field, p: &syn::TypePath) -> Self { + let last_segment = p.path.segments.last().unwrap(); + + let is_vec = + last_segment.ident == syn::Ident::new("Vec", proc_macro2::Span::call_site()); + let is_option = last_segment.ident + == syn::Ident::new("Option", proc_macro2::Span::call_site()); + + if is_vec || is_option { + let generic_type = match &last_segment.arguments { + syn::PathArguments::AngleBracketed(angle_args) => { + assert_eq!(angle_args.args.len(), 1); + let first_arg = &angle_args.args[0]; + + match first_arg { + syn::GenericArgument::Type(ref typath) => typath.clone(), + other @ _ => unimplemented!("Unsupported: {:#?}", other), + } + } + other @ _ => unimplemented!("Unsupported: {:#?}", other), + }; + + if is_vec { + Type::Vec(Box::new(Type::from_type(f, &generic_type))) + } else { + Type::Option(Box::new(Type::from_type(f, &generic_type))) + } + } else { + Type::TypePath(syn::Type::Path(p.clone())) + } + } + + fn from_type_reference(f: &syn::Field, tr: &syn::TypeReference) -> Self { + let lifetime = tr.lifetime.clone(); + let inner_type = Type::from_type(f, tr.elem.as_ref()); + Type::Reference(lifetime, Box::new(inner_type)) + } + + fn from_type_array(f: &syn::Field, ta: &syn::TypeArray) -> Self { + let inner_type = Type::from_type(f, ta.elem.as_ref()); + Type::Array(Box::new(inner_type)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use syn::{self, Data, DataStruct, DeriveInput}; + + fn extract_fields(input: proc_macro2::TokenStream) -> Vec { + let input: DeriveInput = syn::parse2(input).unwrap(); + + let fields = match input.data { + Data::Struct(DataStruct { fields, .. }) => fields, + _ => panic!("Input must be a struct"), + }; + + fields.iter().map(|field| field.to_owned()).collect() + } + + #[test] + fn test_generating_a_simple_writer_snippet() { + let snippet: proc_macro2::TokenStream = quote! { + struct ABoringStruct { + counter: usize, + } + }; + + let fields = extract_fields(snippet); + let counter = Field::from(&fields[0]); + + let snippet = counter.writer_snippet().to_string(); + assert_eq!(snippet, + (quote!{ + { + let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter ) . collect ( ); + + if let parquet::column::writer::ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer { + typed . write_batch ( & vals [ .. ] , None , None ) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ counter } ) + } + } + }).to_string() + ) + } + + #[test] + fn test_optional_to_writer_snippet() { + let struct_def: proc_macro2::TokenStream = quote! { + struct StringBorrower<'a> { + optional_str: Option<&'a str>, + optional_string: &Option, + optional_dumb_int: &Option<&i32>, + } + }; + + let fields = extract_fields(struct_def); + + let optional = Field::from(&fields[0]); + let snippet = optional.writer_snippet(); + assert_eq!(snippet.to_string(), + (quote! { + { + let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_str . is_some ( ) { 1 } else { 0 } ) . collect ( ) ; + + let vals: Vec <_> = records.iter().filter_map( |rec| { + if let Some ( ref inner ) = rec . optional_str { + Some ( (&inner[..]).into() ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer { + typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; + } else { + panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } ) + } + } + } + ).to_string()); + + let optional = Field::from(&fields[1]); + let snippet = optional.writer_snippet(); + assert_eq!(snippet.to_string(), + (quote!{ + { + let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_string . is_some ( ) { 1 } else { 0 } ) . collect ( ) ; + + let vals: Vec <_> = records.iter().filter_map( |rec| { + if let Some ( ref inner ) = rec . optional_string { + Some ( (&inner[..]).into() ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer { + typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; + } else { + panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } ) + } + } + }).to_string()); + + let optional = Field::from(&fields[2]); + let snippet = optional.writer_snippet(); + assert_eq!(snippet.to_string(), + (quote!{ + { + let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_dumb_int . is_some ( ) { 1 } else { 0 } ) . collect ( ) ; + + let vals: Vec <_> = records.iter().filter_map( |rec| { + if let Some ( inner ) = rec . optional_dumb_int { + Some ( inner ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer { + typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; + } else { + panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } ) + } + } + }).to_string()); + } + + #[test] + fn test_converting_to_column_writer_type() { + let snippet: proc_macro2::TokenStream = quote! { + struct ABasicStruct { + yes_no: bool, + name: String, + } + }; + + let fields = extract_fields(snippet); + let processed: Vec<_> = fields.iter().map(|field| Field::from(field)).collect(); + + let column_writers: Vec<_> = processed + .iter() + .map(|field| field.ty.column_writer()) + .collect(); + + assert_eq!( + column_writers, + vec![ + syn::parse_quote!( + parquet::column::writer::ColumnWriter::BoolColumnWriter + ), + syn::parse_quote!( + parquet::column::writer::ColumnWriter::ByteArrayColumnWriter + ) + ] + ); + } + + #[test] + fn convert_basic_struct() { + let snippet: proc_macro2::TokenStream = quote! { + struct ABasicStruct { + yes_no: bool, + name: String, + } + }; + + let fields = extract_fields(snippet); + let processed: Vec<_> = fields.iter().map(|field| Field::from(field)).collect(); + assert_eq!(processed.len(), 2); + + assert_eq!( + processed, + vec![ + Field { + ident: syn::Ident::new("yes_no", proc_macro2::Span::call_site()), + ty: Type::TypePath(syn::parse_quote!(bool)), + is_a_byte_buf: false, + third_party_type: None, + }, + Field { + ident: syn::Ident::new("name", proc_macro2::Span::call_site()), + ty: Type::TypePath(syn::parse_quote!(String)), + is_a_byte_buf: true, + third_party_type: None, + } + ] + ) + } + + #[test] + fn test_get_inner_type() { + let snippet: proc_macro2::TokenStream = quote! { + struct LotsOfInnerTypes { + a_vec: Vec, + a_option: std::option::Option, + a_silly_string: std::string::String, + a_complicated_thing: std::option::Option>, + } + }; + + let fields = extract_fields(snippet); + let converted_fields: Vec<_> = + fields.iter().map(|field| Type::from(field)).collect(); + let inner_types: Vec<_> = converted_fields + .iter() + .map(|field| field.inner_type()) + .collect(); + let inner_types_strs: Vec<_> = inner_types + .iter() + .map(|ty| (quote! { #ty }).to_string()) + .collect(); + + assert_eq!( + inner_types_strs, + vec![ + "u8", + "bool", + "std :: string :: String", + "std :: result :: Result < ( ) , ( ) >" + ] + ) + } + + #[test] + fn test_physical_type() { + use parquet::basic::Type as BasicType; + let snippet: proc_macro2::TokenStream = quote! { + struct LotsOfInnerTypes { + a_buf: Vec, + a_number: i32, + a_verbose_option: std::option::Option, + a_silly_string: std::string::String, + a_fix_byte_buf: [u8; 10], + a_complex_option: Option<&Vec>, + a_complex_vec: &Vec<&Option>, + } + }; + + let fields = extract_fields(snippet); + let converted_fields: Vec<_> = + fields.iter().map(|field| Type::from(field)).collect(); + let physical_types: Vec<_> = converted_fields + .iter() + .map(|ty| ty.physical_type()) + .collect(); + + assert_eq!( + physical_types, + vec![ + BasicType::BYTE_ARRAY, + BasicType::INT32, + BasicType::BOOLEAN, + BasicType::BYTE_ARRAY, + BasicType::FIXED_LEN_BYTE_ARRAY, + BasicType::BYTE_ARRAY, + BasicType::INT32 + ] + ) + } + + #[test] + fn test_convert_comprehensive_owned_struct() { + let snippet: proc_macro2::TokenStream = quote! { + struct VecHolder { + a_vec: Vec, + a_option: std::option::Option, + a_silly_string: std::string::String, + a_complicated_thing: std::option::Option>, + } + }; + + let fields = extract_fields(snippet); + let converted_fields: Vec<_> = + fields.iter().map(|field| Type::from(field)).collect(); + + assert_eq!( + converted_fields, + vec![ + Type::Vec(Box::new(Type::TypePath(syn::parse_quote!(u8)))), + Type::Option(Box::new(Type::TypePath(syn::parse_quote!(bool)))), + Type::TypePath(syn::parse_quote!(std::string::String)), + Type::Option(Box::new(Type::TypePath( + syn::parse_quote!(std::result::Result<(),()>) + ))), + ] + ); + } + + #[test] + fn test_convert_borrowed_struct() { + let snippet: proc_macro2::TokenStream = quote! { + struct Borrower<'a> { + a_str: &'a str, + a_borrowed_option: &'a Option, + so_many_borrows: &'a Option<&'a str>, + } + }; + + let fields = extract_fields(snippet); + let types: Vec<_> = fields.iter().map(|field| Type::from(field)).collect(); + + assert_eq!( + types, + vec![ + Type::Reference( + Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())), + Box::new(Type::TypePath(syn::parse_quote!(str))) + ), + Type::Reference( + Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())), + Box::new(Type::Option(Box::new(Type::TypePath(syn::parse_quote!( + bool + ))))) + ), + Type::Reference( + Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())), + Box::new(Type::Option(Box::new(Type::Reference( + Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())), + Box::new(Type::TypePath(syn::parse_quote!(str))) + )))) + ), + ] + ); + } + + #[test] + #[cfg(feature = "chrono")] + fn test_chrono_timestamp_millis() { + let snippet: proc_macro2::TokenStream = quote! { + struct ATimestampStruct { + henceforth: chrono::NaiveDateTime, + maybe_happened: Option<&chrono::NaiveDateTime>, + } + }; + + let fields = extract_fields(snippet); + let when = Field::from(&fields[0]); + assert_eq!(when.writer_snippet().to_string(),(quote!{ + { + let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect(); + if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], None, None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth }) + } + } + }).to_string()); + + let maybe_happened = Field::from(&fields[1]); + assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{ + { + let definition_levels : Vec = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect(); + let vals : Vec<_> = records.iter().filter_map(|rec| { + if let Some(inner) = rec.maybe_happened { + Some( inner.timestamp_millis() ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened }) + } + } + }).to_string()); + } + + #[test] + #[cfg(feature = "chrono")] + fn test_chrono_date() { + let snippet: proc_macro2::TokenStream = quote! { + struct ATimestampStruct { + henceforth: chrono::NaiveDate, + maybe_happened: Option<&chrono::NaiveDate>, + } + }; + + let fields = extract_fields(snippet); + let when = Field::from(&fields[0]); + assert_eq!(when.writer_snippet().to_string(),(quote!{ + { + let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect(); + if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], None, None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth }) + } + } + }).to_string()); + + let maybe_happened = Field::from(&fields[1]); + assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{ + { + let definition_levels : Vec = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect(); + let vals : Vec<_> = records.iter().filter_map(|rec| { + if let Some(inner) = rec.maybe_happened { + Some( inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened }) + } + } + }).to_string()); + } + + #[test] + #[cfg(feature = "uuid")] + fn test_uuid() { + let snippet: proc_macro2::TokenStream = quote! { + struct ATimestampStruct { + unique_id: uuid::Uuid, + maybe_unique_id: Option<&uuid::Uuid>, + } + }; + + let fields = extract_fields(snippet); + let when = Field::from(&fields[0]); + assert_eq!(when.writer_snippet().to_string(),(quote!{ + { + let vals : Vec<_> = records.iter().map(|rec| (&rec.unique_id.to_string()[..]).into() ).collect(); + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], None, None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id }) + } + } + }).to_string()); + + let maybe_happened = Field::from(&fields[1]); + assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{ + { + let definition_levels : Vec = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect(); + let vals : Vec<_> = records.iter().filter_map(|rec| { + if let Some(ref inner) = rec.maybe_unique_id { + Some( (&inner.to_string()[..]).into() ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id }) + } + } + }).to_string()); + } +} diff --git a/rust/parquet_derive_test/Cargo.toml b/rust/parquet_derive_test/Cargo.toml new file mode 100644 index 00000000000..fc5af3efd4b --- /dev/null +++ b/rust/parquet_derive_test/Cargo.toml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "parquet_derive_test" +version = "2.0.0-SNAPSHOT" +authors = ["Apache Arrow "] +keywords = [ "parquet" ] +edition = "2018" + +[dependencies] +parquet = { path = "../parquet", version = "2.0.0-SNAPSHOT" } +parquet_derive = { path = "../parquet_derive", version = "2.0.0-SNAPSHOT" } diff --git a/rust/parquet_derive_test/src/lib.rs b/rust/parquet_derive_test/src/lib.rs new file mode 100644 index 00000000000..aca4dc57049 --- /dev/null +++ b/rust/parquet_derive_test/src/lib.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate parquet; + +#[macro_use] +extern crate parquet_derive; + +use parquet::record::RecordWriter; + +#[derive(ParquetRecordWriter)] +struct ACompleteRecord<'a> { + pub a_bool: bool, + pub a_str: &'a str, + pub a_string: String, + pub a_borrowed_string: &'a String, + pub maybe_a_str: Option<&'a str>, + pub maybe_a_string: Option, + pub magic_number: i32, + pub low_quality_pi: f32, + pub high_quality_pi: f64, + pub maybe_pi: Option, + pub maybe_best_pi: Option, + pub borrowed_maybe_a_string: &'a Option, + pub borrowed_maybe_a_str: &'a Option<&'a str>, +} + +#[cfg(test)] +mod tests { + use super::*; + + use parquet::{ + file::{ + properties::WriterProperties, + writer::{FileWriter, SerializedFileWriter}, + }, + schema::parser::parse_message_type, + }; + use std::{env, fs, io::Write, rc::Rc}; + + #[test] + fn test_parquet_derive_hello() { + let file = get_temp_file("test_parquet_derive_hello", &[]); + let schema_str = "message schema { + REQUIRED boolean a_bool; + REQUIRED BINARY a_str (UTF8); + REQUIRED BINARY a_string (UTF8); + REQUIRED BINARY a_borrowed_string (UTF8); + OPTIONAL BINARY a_maybe_str (UTF8); + OPTIONAL BINARY a_maybe_string (UTF8); + REQUIRED INT32 magic_number; + REQUIRED FLOAT low_quality_pi; + REQUIRED DOUBLE high_quality_pi; + OPTIONAL FLOAT maybe_pi; + OPTIONAL DOUBLE maybe_best_pi; + OPTIONAL BINARY borrowed_maybe_a_string (UTF8); + OPTIONAL BINARY borrowed_maybe_a_str (UTF8); + }"; + + let schema = Rc::new(parse_message_type(schema_str).unwrap()); + + let props = Rc::new(WriterProperties::builder().build()); + let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); + + let a_str = "hello mother".to_owned(); + let a_borrowed_string = "cool news".to_owned(); + let maybe_a_string = Some("it's true, I'm a string".to_owned()); + let maybe_a_str = Some(&a_str[..]); + + let drs: Vec = vec![ACompleteRecord { + a_bool: true, + a_str: &a_str[..], + a_string: "hello father".into(), + a_borrowed_string: &a_borrowed_string, + maybe_a_str: Some(&a_str[..]), + maybe_a_string: Some(a_str.clone()), + magic_number: 100, + low_quality_pi: 3.14, + high_quality_pi: 3.1415, + maybe_pi: Some(3.14), + maybe_best_pi: Some(3.1415), + borrowed_maybe_a_string: &maybe_a_string, + borrowed_maybe_a_str: &maybe_a_str, + }]; + + let mut row_group = writer.next_row_group().unwrap(); + drs.as_slice().write_to_row_group(&mut row_group).unwrap(); + writer.close_row_group(row_group).unwrap(); + writer.close().unwrap(); + } + + /// Returns file handle for a temp file in 'target' directory with a provided content + pub fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File { + // build tmp path to a file in "target/debug/testdata" + let mut path_buf = env::current_dir().unwrap(); + path_buf.push("target"); + path_buf.push("debug"); + path_buf.push("testdata"); + fs::create_dir_all(&path_buf).unwrap(); + path_buf.push(file_name); + + // write file content + let mut tmp_file = fs::File::create(path_buf.as_path()).unwrap(); + tmp_file.write_all(content).unwrap(); + tmp_file.sync_all().unwrap(); + + // return file handle for both read and write + let file = fs::OpenOptions::new() + .read(true) + .write(true) + .open(path_buf.as_path()); + assert!(file.is_ok()); + file.unwrap() + } +}