From 5b0e5e25bc07eb3db35ca9441718b3a00ccf7c07 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Thu, 7 Jul 2022 12:33:03 +0200 Subject: [PATCH] update arrow: ipc limit and reduce categorical-> dictionary bound checks --- polars/polars-arrow/Cargo.toml | 4 +- polars/polars-core/Cargo.toml | 8 +-- .../chunked_array/logical/categorical/from.rs | 56 +++++++++++++++---- polars/polars-io/Cargo.toml | 4 +- polars/polars-io/src/ipc/ipc_file.rs | 15 +++-- polars/polars-io/src/parquet/write.rs | 6 +- py-polars/Cargo.lock | 2 +- py-polars/polars/internals/frame.py | 3 + 8 files changed, 69 insertions(+), 29 deletions(-) diff --git a/polars/polars-arrow/Cargo.toml b/polars/polars-arrow/Cargo.toml index 0572d65bce19..fe791f3c18d8 100644 --- a/polars/polars-arrow/Cargo.toml +++ b/polars/polars-arrow/Cargo.toml @@ -9,9 +9,9 @@ description = "Arrow interfaces for Polars DataFrame library" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "f5f6b7e3aa10b80dc574abacf96b30e0927410fe", features = ["compute_concatenate"], default-features = false } +# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "98e49133b2e56e51e30335830485b3cf768eb5a2", features = ["compute_concatenate"], default-features = false } # arrow = { package = "arrow2", path = "../../../arrow2", features = ["compute_concatenate"], default-features = false } -# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "arity_assign", features = ["compute_concatenate"], default-features = false } +arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "fix_cast_dict", features = ["compute_concatenate"], default-features = false } # arrow = { package = "arrow2", version = "0.12", default-features = false, features = ["compute_concatenate"] } hashbrown = "0.12" num = "^0.4" diff --git a/polars/polars-core/Cargo.toml b/polars/polars-core/Cargo.toml index 808d89267005..89f46b08ba0f 100644 --- a/polars/polars-core/Cargo.toml +++ b/polars/polars-core/Cargo.toml @@ -172,11 +172,11 @@ thiserror = "^1.0" [dependencies.arrow] package = "arrow2" -git = "https://github.com/jorgecarleitao/arrow2" -# git = "https://github.com/ritchie46/arrow2" -rev = "f5f6b7e3aa10b80dc574abacf96b30e0927410fe" +# git = "https://github.com/jorgecarleitao/arrow2" +git = "https://github.com/ritchie46/arrow2" +# rev = "98e49133b2e56e51e30335830485b3cf768eb5a2" # path = "../../../arrow2" -# branch = "arity_assign" +branch = "fix_cast_dict" # version = "0.12" default-features = false features = [ diff --git a/polars/polars-core/src/chunked_array/logical/categorical/from.rs b/polars/polars-core/src/chunked_array/logical/categorical/from.rs index f4a059a3b05f..e42c7fcfe6fd 100644 --- a/polars/polars-core/src/chunked_array/logical/categorical/from.rs +++ b/polars/polars-core/src/chunked_array/logical/categorical/from.rs @@ -1,5 +1,6 @@ use super::*; use arrow::array::DictionaryArray; +use arrow::datatypes::IntegerType; use polars_arrow::compute::cast::cast; impl From<&CategoricalChunked> for DictionaryArray { @@ -7,9 +8,19 @@ impl From<&CategoricalChunked> for DictionaryArray { let keys = ca.logical().rechunk(); let keys = keys.downcast_iter().next().unwrap(); let map = &**ca.get_rev_map(); + let dtype = ArrowDataType::Dictionary( + IntegerType::UInt32, + Box::new(ArrowDataType::LargeUtf8), + false, + ); match map { RevMapping::Local(arr) => { - DictionaryArray::from_data(keys.clone(), Box::new(arr.clone())) + // Safety: + // the keys are in bounds + unsafe { + DictionaryArray::try_new_unchecked(dtype, keys.clone(), Box::new(arr.clone())) + .unwrap() + } } RevMapping::Global(reverse_map, values, _uuid) => { let iter = keys @@ -17,7 +28,12 @@ impl From<&CategoricalChunked> for DictionaryArray { .map(|opt_k| opt_k.map(|k| *reverse_map.get(k).unwrap())); let keys = PrimitiveArray::from_trusted_len_iter(iter); - DictionaryArray::from_data(keys, Box::new(values.clone())) + // Safety: + // the keys are in bounds + unsafe { + DictionaryArray::try_new_unchecked(dtype, keys, Box::new(values.clone())) + .unwrap() + } } } } @@ -27,23 +43,39 @@ impl From<&CategoricalChunked> for DictionaryArray { let keys = ca.logical().rechunk(); let keys = keys.downcast_iter().next().unwrap(); let map = &**ca.get_rev_map(); + let dtype = ArrowDataType::Dictionary( + IntegerType::UInt32, + Box::new(ArrowDataType::LargeUtf8), + false, + ); match map { - RevMapping::Local(arr) => DictionaryArray::from_data( - cast(keys, &ArrowDataType::Int64) - .unwrap() - .as_any() - .downcast_ref::>() - .unwrap() - .clone(), - Box::new(arr.clone()), - ), + // Safety: + // the keys are in bounds + RevMapping::Local(arr) => unsafe { + DictionaryArray::try_new_unchecked( + dtype, + cast(keys, &ArrowDataType::Int64) + .unwrap() + .as_any() + .downcast_ref::>() + .unwrap() + .clone(), + Box::new(arr.clone()), + ) + .unwrap() + }, RevMapping::Global(reverse_map, values, _uuid) => { let iter = keys .into_iter() .map(|opt_k| opt_k.map(|k| *reverse_map.get(k).unwrap() as i64)); let keys = PrimitiveArray::from_trusted_len_iter(iter); - DictionaryArray::from_data(keys, Box::new(values.clone())) + // Safety: + // the keys are in bounds + unsafe { + DictionaryArray::try_new_unchecked(dtype, keys, Box::new(values.clone())) + .unwrap() + } } } } diff --git a/polars/polars-io/Cargo.toml b/polars/polars-io/Cargo.toml index 3461c37e7d93..d3b14e3013f0 100644 --- a/polars/polars-io/Cargo.toml +++ b/polars/polars-io/Cargo.toml @@ -37,8 +37,8 @@ private = ["polars-time/private"] [dependencies] ahash = "0.7" anyhow = "1.0" -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "f5f6b7e3aa10b80dc574abacf96b30e0927410fe", default-features = false } -# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "arity_assign", default-features = false } +# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "98e49133b2e56e51e30335830485b3cf768eb5a2", default-features = false } +arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "fix_cast_dict", default-features = false } # arrow = { package = "arrow2", version = "0.12", default-features = false } # arrow = { package = "arrow2", path = "../../../arrow2", default-features = false } csv-core = { version = "0.1.10", optional = true } diff --git a/polars/polars-io/src/ipc/ipc_file.rs b/polars/polars-io/src/ipc/ipc_file.rs index ab774f5ad49e..19e80a6964a7 100644 --- a/polars/polars-io/src/ipc/ipc_file.rs +++ b/polars/polars-io/src/ipc/ipc_file.rs @@ -131,13 +131,14 @@ impl IpcReader { metadata.schema.clone() }; - let reader = read::FileReader::new(&mut self.reader, metadata, sorted_projection); + let reader = + read::FileReader::new(&mut self.reader, metadata, sorted_projection, self.n_rows); let include_row_count = self.row_count.is_some(); finish_reader( reader, rechunk, - self.n_rows, + None, predicate, aggregate, &schema, @@ -198,12 +199,16 @@ where }; let include_row_count = self.row_count.is_some(); - let ipc_reader = - read::FileReader::new(&mut self.reader, metadata.clone(), sorted_projection); + let ipc_reader = read::FileReader::new( + &mut self.reader, + metadata.clone(), + sorted_projection, + self.n_rows, + ); finish_reader( ipc_reader, rechunk, - self.n_rows, + None, None, None, &schema, diff --git a/polars/polars-io/src/parquet/write.rs b/polars/polars-io/src/parquet/write.rs index b593c1a2b251..5dbbbb1afe00 100644 --- a/polars/polars-io/src/parquet/write.rs +++ b/polars/polars-io/src/parquet/write.rs @@ -15,9 +15,6 @@ pub use write::{BrotliLevel, CompressionOptions as ParquetCompression, GzipLevel /// Write a DataFrame to parquet format /// -/// # Example -/// -/// #[must_use] pub struct ParquetWriter { writer: W, @@ -44,6 +41,9 @@ where } /// Set the compression used. Defaults to `Lz4Raw`. + /// + /// The default compression `Lz4Raw` has very good performance, but may not yet been supported + /// by older readers. If you want more compatability guarantees, consider using `Snappy`. pub fn with_compression(mut self, compression: write::CompressionOptions) -> Self { self.compression = compression; self diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index 19ae16d0c9a8..03ea114a4c20 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -74,7 +74,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.12.0" -source = "git+https://github.com/jorgecarleitao/arrow2?rev=f5f6b7e3aa10b80dc574abacf96b30e0927410fe#f5f6b7e3aa10b80dc574abacf96b30e0927410fe" +source = "git+https://github.com/ritchie46/arrow2?branch=fix_cast_dict#eeddfac8f16bc745981ab0c7f222ff58e85b6368" dependencies = [ "arrow-format", "avro-schema", diff --git a/py-polars/polars/internals/frame.py b/py-polars/polars/internals/frame.py index a7accec8de04..6110672808fe 100644 --- a/py-polars/polars/internals/frame.py +++ b/py-polars/polars/internals/frame.py @@ -1392,6 +1392,9 @@ def write_parquet( - "brotli" - "lz4" - "zstd" + + The default compression "lz4" (actually lz4raw) has very good performance, but may not yet been supported + by older readers. If you want more compatability guarantees, consider using "snappy". compression_level Supported by {'gzip', 'brotli', 'zstd'} - "gzip"