Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rust): inline parquet2 #12026

Merged
merged 6 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use hashbrown::hash_map::RawEntryMut;
use once_cell::sync::Lazy;
use smartstring::{LazyCompact, SmartString};

use crate::datatypes::PlIdHashMap;
use crate::datatypes::{InitHashMaps2, PlIdHashMap};
use crate::hashing::_HASHMAP_INIT_SIZE;
use crate::prelude::InitHashMaps;

/// We use atomic reference counting to determine how many threads use the
/// string cache. If the refcount is zero, we may clear the string cache.
Expand Down
57 changes: 4 additions & 53 deletions crates/polars-core/src/datatypes/aliases.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub use arrow::legacy::index::{IdxArr, IdxSize};
pub use polars_utils::aliases::{InitHashMaps, PlHashMap, PlHashSet, PlIndexMap, PlIndexSet};

use super::*;
use crate::hashing::IdBuildHasher;
Expand All @@ -21,68 +22,18 @@ pub type IdxType = UInt32Type;
#[cfg(feature = "bigidx")]
pub type IdxType = UInt64Type;

pub type PlHashMap<K, V> = hashbrown::HashMap<K, V, RandomState>;
/// This hashmap has the uses an IdHasher
/// This hashmap uses an IdHasher
pub type PlIdHashMap<K, V> = hashbrown::HashMap<K, V, IdBuildHasher>;
pub type PlHashSet<V> = hashbrown::HashSet<V, RandomState>;
pub type PlIndexMap<K, V> = indexmap::IndexMap<K, V, RandomState>;
pub type PlIndexSet<K> = indexmap::IndexSet<K, RandomState>;

pub trait InitHashMaps {
pub trait InitHashMaps2 {
type HashMap;

fn new() -> Self::HashMap;

fn with_capacity(capacity: usize) -> Self::HashMap;
}

impl<K, V> InitHashMaps for PlHashMap<K, V> {
type HashMap = Self;

fn new() -> Self::HashMap {
Self::with_capacity_and_hasher(0, Default::default())
}

fn with_capacity(capacity: usize) -> Self {
Self::with_capacity_and_hasher(capacity, Default::default())
}
}
impl<K> InitHashMaps for PlHashSet<K> {
type HashMap = Self;

fn new() -> Self::HashMap {
Self::with_capacity_and_hasher(0, Default::default())
}

fn with_capacity(capacity: usize) -> Self {
Self::with_capacity_and_hasher(capacity, Default::default())
}
}

impl<K> InitHashMaps for PlIndexSet<K> {
type HashMap = Self;

fn new() -> Self::HashMap {
Self::with_capacity_and_hasher(0, Default::default())
}

fn with_capacity(capacity: usize) -> Self::HashMap {
Self::with_capacity_and_hasher(capacity, Default::default())
}
}

impl<K, V> InitHashMaps for PlIndexMap<K, V> {
type HashMap = Self;

fn new() -> Self::HashMap {
Self::with_capacity_and_hasher(0, Default::default())
}

fn with_capacity(capacity: usize) -> Self::HashMap {
Self::with_capacity_and_hasher(capacity, Default::default())
}
}
impl<K, V> InitHashMaps for PlIdHashMap<K, V> {
impl<K, V> InitHashMaps2 for PlIdHashMap<K, V> {
type HashMap = Self;

fn new() -> Self::HashMap {
Expand Down
1 change: 0 additions & 1 deletion crates/polars-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::ops::{Add, AddAssign, Div, Mul, Rem, Sub, SubAssign};

use ahash::RandomState;
pub use aliases::*;
pub use any_value::*;
use arrow::compute::comparison::Simd8;
Expand Down
1 change: 0 additions & 1 deletion crates/polars-error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ description = "Error definitions for the Polars DataFrame library"
arrow-format = { version = "0.8.1", optional = true }
avro-schema = { workspace = true, optional = true }
object_store = { workspace = true, optional = true }
parquet2 = { workspace = true, optional = true }
regex = { workspace = true, optional = true }
simdutf8 = { workspace = true }
thiserror = { workspace = true }
Expand Down
7 changes: 0 additions & 7 deletions crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ impl From<object_store::Error> for PolarsError {
}
}

#[cfg(feature = "parquet2")]
impl From<parquet2::error::Error> for PolarsError {
fn from(err: parquet2::error::Error) -> Self {
polars_err!(ComputeError: "parquet error: {err:?}")
}
}

#[cfg(feature = "avro-schema")]
impl From<avro_schema::error::Error> for PolarsError {
fn from(value: avro_schema::error::Error) -> Self {
Expand Down
34 changes: 24 additions & 10 deletions crates/polars-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,26 @@ ethnum = { workspace = true }
fallible-streaming-iterator = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
num-traits = { workspace = true }
parquet2 = { workspace = true, optional = true, default-features = true, features = ["async"] }
polars-error = { workspace = true, features = ["parquet2"] }
polars-error = { workspace = true }
polars-utils = { workspace = true }
simdutf8 = { workspace = true }

[features]
bloom_filter = ["parquet2/bloom_filter"]
async = ["futures"]
parquet-format-safe = "0.2"
seq-macro = { version = "0.3", default-features = false }
streaming-decompression = "0.1"

async-stream = { version = "0.3.3", optional = true }

brotli = { version = "^3.3", optional = true }
flate2 = { version = "^1.0", optional = true, default-features = false }
lz4 = { version = "1.24", optional = true }
serde = { version = "^1.0", optional = true, features = ["derive"] }
snap = { version = "^1.1", optional = true }
zstd = { version = "^0.12", optional = true, default-features = false }

xxhash-rust = { version = "0.8", optional = true, features = ["xxh64"] }

[features]
compression = [
"zstd",
"gzip",
Expand All @@ -37,8 +49,10 @@ compression = [
]

# compression backends
zstd = ["parquet2/zstd"]
snappy = ["parquet2/snappy"]
gzip = ["parquet2/gzip"]
lz4 = ["parquet2/lz4"]
brotli = ["parquet2/brotli"]
snappy = ["snap"]
gzip = ["flate2/rust_backend"]
gzip_zlib_ng = ["flate2/zlib-ng"]

async = ["async-stream", "futures", "parquet-format-safe/async"]
bloom_filter = ["xxhash-rust"]
serde_types = ["serde"]
2 changes: 1 addition & 1 deletion crates/polars-parquet/LICENSE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Some of the code in this crate is subject to the Apache 2 license below, as it
was taken from the arrow2 Rust crate in October 2023. Later changes are subject
was taken from the arrow2 and parquet2 Rust crate in October 2023. Later changes are subject
to the MIT license in ../../LICENSE.


Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ pub mod write;

#[cfg(feature = "io_parquet_bloom_filter")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_bloom_filter")))]
pub use parquet2::bloom_filter;
pub use crate::parquet::bloom_filter;

const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ use arrow::array::{Array, BinaryArray, Utf8Array};
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::{DataType, PhysicalType};
use arrow::offset::Offset;
use parquet2::deserialize::SliceFilteredIter;
use parquet2::encoding::{delta_length_byte_array, hybrid_rle, Encoding};
use parquet2::page::{split_buffer, DataPage, DictPage};
use parquet2::schema::Repetition;
use polars_error::{to_compute_err, PolarsResult};

use super::super::utils::{
Expand All @@ -17,6 +13,10 @@ use super::super::utils::{
};
use super::super::{utils, Pages};
use super::utils::*;
use crate::parquet::deserialize::SliceFilteredIter;
use crate::parquet::encoding::{delta_length_byte_array, hybrid_rle, Encoding};
use crate::parquet::page::{split_buffer, DataPage, DictPage};
use crate::parquet::schema::Repetition;

#[derive(Debug)]
pub(super) struct Required<'a> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use arrow::array::{Array, BinaryArray, DictionaryArray, DictionaryKey, Utf8Array
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::{DataType, PhysicalType};
use arrow::offset::Offset;
use parquet2::page::DictPage;
use polars_error::PolarsResult;

use super::super::dictionary::*;
use super::super::utils::MaybeNext;
use super::super::Pages;
use super::utils::{Binary, SizedBinaryIter};
use crate::arrow::read::deserialize::nested_utils::{InitNested, NestedState};
use crate::parquet::page::DictPage;

/// An iterator adapter over [`Pages`] assumed to be encoded as parquet's dictionary-encoded binary representation
#[derive(Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ use arrow::array::Array;
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::DataType;
use arrow::offset::Offset;
use parquet2::encoding::Encoding;
use parquet2::page::{split_buffer, DataPage, DictPage};
use parquet2::schema::Repetition;
use polars_error::PolarsResult;

use super::super::nested_utils::*;
Expand All @@ -15,6 +12,9 @@ use super::super::utils::MaybeNext;
use super::basic::{deserialize_plain, finish, Dict, ValuesDictionary};
use super::utils::*;
use crate::arrow::read::Pages;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{split_buffer, DataPage, DictPage};
use crate::parquet::schema::Repetition;

#[derive(Debug)]
enum State<'a> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ use arrow::array::BooleanArray;
use arrow::bitmap::utils::BitmapIter;
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::DataType;
use parquet2::deserialize::SliceFilteredIter;
use parquet2::encoding::Encoding;
use parquet2::page::{split_buffer, DataPage, DictPage};
use parquet2::schema::Repetition;
use polars_error::PolarsResult;

use super::super::utils::{
extend_from_decoder, get_selected_rows, next, DecodedState, Decoder,
FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity,
};
use super::super::{utils, Pages};
use crate::parquet::deserialize::SliceFilteredIter;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{split_buffer, DataPage, DictPage};
use crate::parquet::schema::Repetition;

#[derive(Debug)]
struct Values<'a>(BitmapIter<'a>);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use arrow::array::BooleanArray;
use arrow::bitmap::utils::BitmapIter;
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::DataType;
use parquet2::encoding::Encoding;
use parquet2::page::{split_buffer, DataPage, DictPage};
use parquet2::schema::Repetition;
use polars_error::PolarsResult;

use super::super::nested_utils::*;
use super::super::utils::MaybeNext;
use super::super::{utils, Pages};
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{split_buffer, DataPage, DictPage};
use crate::parquet::schema::Repetition;

// The state of a `DataPage` of `Boolean` parquet boolean type
#[allow(clippy::large_enum_variant)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use std::collections::VecDeque;
use arrow::array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray};
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::DataType;
use parquet2::deserialize::SliceFilteredIter;
use parquet2::encoding::hybrid_rle::HybridRleDecoder;
use parquet2::encoding::Encoding;
use parquet2::page::{DataPage, DictPage, Page};
use parquet2::schema::Repetition;

use super::utils::{
self, dict_indices_decoder, extend_from_decoder, get_selected_rows, DecodedState, Decoder,
FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity,
};
use super::Pages;
use crate::parquet::deserialize::SliceFilteredIter;
use crate::parquet::encoding::hybrid_rle::HybridRleDecoder;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{DataPage, DictPage, Page};
use crate::parquet::schema::Repetition;

// The state of a `DataPage` of `Primitive` parquet primitive type
#[derive(Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ use std::collections::VecDeque;
use arrow::array::{Array, DictionaryArray, DictionaryKey};
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::DataType;
use parquet2::encoding::hybrid_rle::HybridRleDecoder;
use parquet2::encoding::Encoding;
use parquet2::page::{DataPage, DictPage, Page};
use parquet2::schema::Repetition;
use polars_error::{polars_err, PolarsResult};

use super::super::super::Pages;
use super::super::nested_utils::*;
use super::super::utils::{dict_indices_decoder, not_implemented, MaybeNext, PageState};
use super::finish_key;
use crate::parquet::encoding::hybrid_rle::HybridRleDecoder;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{DataPage, DictPage, Page};
use crate::parquet::schema::Repetition;

// The state of a required DataPage with a boolean physical type
#[derive(Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ use std::collections::VecDeque;
use arrow::array::FixedSizeBinaryArray;
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::DataType;
use parquet2::deserialize::SliceFilteredIter;
use parquet2::encoding::{hybrid_rle, Encoding};
use parquet2::page::{split_buffer, DataPage, DictPage};
use parquet2::schema::Repetition;
use polars_error::PolarsResult;

use super::super::utils::{
Expand All @@ -16,6 +12,10 @@ use super::super::utils::{
};
use super::super::Pages;
use super::utils::FixedSizeBinary;
use crate::parquet::deserialize::SliceFilteredIter;
use crate::parquet::encoding::{hybrid_rle, Encoding};
use crate::parquet::page::{split_buffer, DataPage, DictPage};
use crate::parquet::schema::Repetition;

pub(super) type Dict = Vec<u8>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::collections::VecDeque;
use arrow::array::{Array, DictionaryArray, DictionaryKey, FixedSizeBinaryArray};
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::DataType;
use parquet2::page::DictPage;
use polars_error::PolarsResult;

use super::super::dictionary::*;
use super::super::utils::MaybeNext;
use super::super::Pages;
use crate::arrow::read::deserialize::nested_utils::{InitNested, NestedState};
use crate::parquet::page::DictPage;

/// An iterator adapter over [`Pages`] assumed to be encoded as parquet's dictionary-encoded binary representation
#[derive(Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ use std::collections::VecDeque;
use arrow::array::FixedSizeBinaryArray;
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::DataType;
use parquet2::encoding::Encoding;
use parquet2::page::{DataPage, DictPage};
use parquet2::schema::Repetition;
use polars_error::PolarsResult;

use super::super::utils::{not_implemented, MaybeNext, PageState};
Expand All @@ -16,6 +13,9 @@ use crate::arrow::read::deserialize::fixed_size_binary::basic::{
use crate::arrow::read::deserialize::nested_utils::{next, NestedDecoder};
use crate::arrow::read::deserialize::utils::Pushable;
use crate::arrow::read::{InitNested, NestedState, Pages};
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{DataPage, DictPage};
use crate::parquet::schema::Repetition;

#[derive(Debug)]
enum State<'a> {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ mod utils;
use arrow::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, MapArray};
use arrow::datatypes::{DataType, Field, IntervalUnit};
use arrow::offset::Offsets;
use parquet2::read::get_page_iterator as _get_page_iterator;
use parquet2::schema::types::PrimitiveType;
use simple::page_iter_to_arrays;

pub use self::nested_utils::{init_nested, InitNested, NestedArrayIter, NestedState};
pub use self::struct_::StructIterator;
use super::*;
use crate::parquet::read::get_page_iterator as _get_page_iterator;
use crate::parquet::schema::types::PrimitiveType;

/// Creates a new iterator of compressed pages.
pub fn get_page_iterator<R: Read + Seek>(
Expand Down
Loading