Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Migrated to parquet2 v0.3 (#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Aug 10, 2021
1 parent 6b6acb8 commit bfb4910
Show file tree
Hide file tree
Showing 29 changed files with 121 additions and 123 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.2", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.3", optional = true, default_features = false, features = ["stream"] }

[dev-dependencies]
rand = "0.8"
Expand Down
19 changes: 11 additions & 8 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use std::fs::File;
use std::sync::Arc;
use std::{collections::HashMap, convert::TryFrom, io::Read};

use arrow2::datatypes::DataType;
use arrow2::error::Result;
use arrow2::io::parquet::write::{Encoding, RowGroupIterator};
use arrow2::io::{
json_integration::ArrowJson,
parquet::write::{write_file, CompressionCodec, Version, WriteOptions},
use arrow2::{
datatypes::{DataType, Schema},
error::Result,
io::{
json_integration::{to_record_batch, ArrowJson},
parquet::write::{
write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions,
},
},
record_batch::RecordBatch,
};
use arrow2::{datatypes::Schema, io::json_integration::to_record_batch, record_batch::RecordBatch};

use clap::{App, Arg};

Expand Down Expand Up @@ -154,7 +157,7 @@ fn main() -> Result<()> {

let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version,
};

Expand Down
2 changes: 1 addition & 1 deletion benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {

let options = WriteOptions {
write_statistics: false,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V1,
};

Expand Down
4 changes: 2 additions & 2 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow2::{
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{
array_to_page, write_file, CompressionCodec, DynIter, Encoding, Version, WriteOptions,
array_to_page, write_file, Compression, DynIter, Encoding, Version, WriteOptions,
},
};

Expand All @@ -16,7 +16,7 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()>

let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V2,
};
let encoding = Encoding::Plain;
Expand Down
7 changes: 4 additions & 3 deletions examples/parquet_write_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{write_file, CompressionCodec, RowGroupIterator, Version, WriteOptions},
io::parquet::write::{
write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions,
},
record_batch::RecordBatch,
};
use parquet2::schema::Encoding;

fn write_batch(path: &str, batch: RecordBatch) -> Result<()> {
let schema = batch.schema().clone();

let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V2,
};

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ mod tests {
mod tests_integration {
use std::sync::Arc;

use super::write::CompressionCodec;
use super::write::Compression;
use crate::array::{Array, PrimitiveArray, Utf8Array};
use crate::datatypes::DataType;
use crate::datatypes::TimeUnit;
Expand All @@ -421,7 +421,7 @@ mod tests_integration {
fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result<Vec<u8>> {
let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V1,
};

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use parquet2::{
encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage, DataPageHeader},
page::{BinaryPageDict, DataPage, DataPageHeader, DataPageHeaderExt},
read::{levels, StreamingIterator},
};

Expand Down Expand Up @@ -214,7 +214,7 @@ fn extend_from_page<O: Offset>(
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use parquet2::{
encoding::Encoding,
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::{
levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
StreamingIterator,
Expand Down Expand Up @@ -121,8 +121,8 @@ fn extend_from_page<O: Offset>(

match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.repetition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);
assert_eq!(header.repetition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
Expand All @@ -137,11 +137,11 @@ fn extend_from_page<O: Offset>(
values_buffer,
additional,
(
&header.repetition_level_encoding,
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::super::utils;
use parquet2::{
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::{levels, StreamingIterator},
};

Expand Down Expand Up @@ -97,7 +97,7 @@ fn extend_from_page(
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => {
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use parquet2::{
encoding::Encoding,
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::{
levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
StreamingIterator,
Expand Down Expand Up @@ -107,8 +107,8 @@ fn extend_from_page(

match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.repetition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);
assert_eq!(header.repetition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
Expand All @@ -123,11 +123,11 @@ fn extend_from_page(
values_buffer,
additional,
(
&header.repetition_level_encoding,
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/fixed_size_binary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
page::{DataPage, DataPageHeader, FixedLenByteArrayPageDict},
page::{DataPage, DataPageHeader, DataPageHeaderExt, FixedLenByteArrayPageDict},
read::{levels, StreamingIterator},
};

Expand Down Expand Up @@ -171,7 +171,7 @@ pub(crate) fn extend_from_page(
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ pub use parquet2::{
decompress, get_page_iterator as _get_page_iterator, read_metadata as _read_metadata,
streaming_iterator, Decompressor, PageIterator, StreamingIterator,
},
schema::{
types::{LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
TimeUnit as ParquetTimeUnit, TimestampType,
},
types::int96_to_i64_ns,
Expand All @@ -43,7 +43,7 @@ pub fn get_page_iterator<'b, RR: Read + Seek>(
buffer: Vec<u8>,
) -> Result<PageIterator<'b, RR>> {
Ok(_get_page_iterator(
metadata, row_group, column, reader, buffer,
metadata, row_group, column, reader, None, buffer,
)?)
}

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/primitive/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
page::{DataPage, DataPageHeader, PrimitivePageDict},
page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::levels,
types::NativeType,
};
Expand Down Expand Up @@ -160,7 +160,7 @@ where
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
7 changes: 3 additions & 4 deletions src/io/parquet/read/primitive/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::sync::Arc;

use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128},
page::{DataPage, DataPageHeader, PrimitivePageDict},
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::{levels, StreamingIterator},
schema::Encoding,
types::NativeType,
};

Expand Down Expand Up @@ -102,7 +101,7 @@ where
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/primitive/nested.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
encoding::Encoding,
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
types::NativeType,
};
Expand Down Expand Up @@ -127,8 +127,8 @@ where

match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.repetition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);
assert_eq!(header.repetition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
Expand All @@ -143,11 +143,11 @@ where
values_buffer,
additional,
(
&header.repetition_level_encoding,
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use parquet2::{
schema::{
types::{
BasicTypeInfo, GroupConvertedType, LogicalType, ParquetType, PhysicalType,
PrimitiveConvertedType, TimeUnit as ParquetTimeUnit,
PrimitiveConvertedType, TimeUnit as ParquetTimeUnit, TimestampType,
},
Repetition, TimestampType,
Repetition,
},
};

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet2::{encoding::get_length, schema::Encoding};
use parquet2::encoding::{get_length, Encoding};

use crate::error::ArrowError;

Expand Down
5 changes: 3 additions & 2 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use parquet2::schema::Encoding;
use parquet2::{metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions};
use parquet2::{
encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions,
};

use super::super::{levels, utils};
use super::basic::{build_statistics, encode_plain};
Expand Down
3 changes: 1 addition & 2 deletions src/io/parquet/write/boolean/basic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use parquet2::{
encoding::hybrid_rle::bitpacked_encode,
encoding::{hybrid_rle::bitpacked_encode, Encoding},
metadata::ColumnDescriptor,
page::CompressedDataPage,
schema::Encoding,
statistics::{serialize_statistics, BooleanStatistics, ParquetStatistics, Statistics},
write::WriteOptions,
};
Expand Down
5 changes: 3 additions & 2 deletions src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use parquet2::schema::Encoding;
use parquet2::{metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions};
use parquet2::{
encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions,
};

use super::super::{levels, utils};
use super::basic::{build_statistics, encode_plain};
Expand Down
11 changes: 6 additions & 5 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use parquet2::encoding::hybrid_rle::encode_u32;
use parquet2::page::{CompressedDictPage, CompressedPage};
use parquet2::schema::Encoding;
use parquet2::write::DynIter;
use parquet2::{metadata::ColumnDescriptor, write::WriteOptions};
use parquet2::{
encoding::{hybrid_rle::encode_u32, Encoding},
metadata::ColumnDescriptor,
page::{CompressedDictPage, CompressedPage},
write::{DynIter, WriteOptions},
};

use super::binary::encode_plain as binary_encode_plain;
use super::primitive::encode_plain as primitive_encode_plain;
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/fixed_len_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
compression::create_codec, metadata::ColumnDescriptor, page::CompressedDataPage,
schema::Encoding, write::WriteOptions,
compression::create_codec, encoding::Encoding, metadata::ColumnDescriptor,
page::CompressedDataPage, write::WriteOptions,
};

use super::utils;
Expand Down
Loading

0 comments on commit bfb4910

Please sign in to comment.