Skip to content

Commit

Permalink
update arrow: ipc limit and reduce categorical-> dictionary bound checks
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 8, 2022
1 parent d248d00 commit 5b0e5e2
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 29 deletions.
4 changes: 2 additions & 2 deletions polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "f5f6b7e3aa10b80dc574abacf96b30e0927410fe", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "98e49133b2e56e51e30335830485b3cf768eb5a2", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "arity_assign", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "fix_cast_dict", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
num = "^0.4"
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ thiserror = "^1.0"

[dependencies.arrow]
package = "arrow2"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "f5f6b7e3aa10b80dc574abacf96b30e0927410fe"
# git = "https://github.com/jorgecarleitao/arrow2"
git = "https://github.com/ritchie46/arrow2"
# rev = "98e49133b2e56e51e30335830485b3cf768eb5a2"
# path = "../../../arrow2"
# branch = "arity_assign"
branch = "fix_cast_dict"
# version = "0.12"
default-features = false
features = [
Expand Down
56 changes: 44 additions & 12 deletions polars/polars-core/src/chunked_array/logical/categorical/from.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,39 @@
use super::*;
use arrow::array::DictionaryArray;
use arrow::datatypes::IntegerType;
use polars_arrow::compute::cast::cast;

impl From<&CategoricalChunked> for DictionaryArray<u32> {
fn from(ca: &CategoricalChunked) -> Self {
let keys = ca.logical().rechunk();
let keys = keys.downcast_iter().next().unwrap();
let map = &**ca.get_rev_map();
let dtype = ArrowDataType::Dictionary(
IntegerType::UInt32,
Box::new(ArrowDataType::LargeUtf8),
false,
);
match map {
RevMapping::Local(arr) => {
DictionaryArray::from_data(keys.clone(), Box::new(arr.clone()))
// Safety:
// the keys are in bounds
unsafe {
DictionaryArray::try_new_unchecked(dtype, keys.clone(), Box::new(arr.clone()))
.unwrap()
}
}
RevMapping::Global(reverse_map, values, _uuid) => {
let iter = keys
.into_iter()
.map(|opt_k| opt_k.map(|k| *reverse_map.get(k).unwrap()));
let keys = PrimitiveArray::from_trusted_len_iter(iter);

DictionaryArray::from_data(keys, Box::new(values.clone()))
// Safety:
// the keys are in bounds
unsafe {
DictionaryArray::try_new_unchecked(dtype, keys, Box::new(values.clone()))
.unwrap()
}
}
}
}
Expand All @@ -27,23 +43,39 @@ impl From<&CategoricalChunked> for DictionaryArray<i64> {
let keys = ca.logical().rechunk();
let keys = keys.downcast_iter().next().unwrap();
let map = &**ca.get_rev_map();
let dtype = ArrowDataType::Dictionary(
IntegerType::UInt32,
Box::new(ArrowDataType::LargeUtf8),
false,
);
match map {
RevMapping::Local(arr) => DictionaryArray::from_data(
cast(keys, &ArrowDataType::Int64)
.unwrap()
.as_any()
.downcast_ref::<PrimitiveArray<i64>>()
.unwrap()
.clone(),
Box::new(arr.clone()),
),
// Safety:
// the keys are in bounds
RevMapping::Local(arr) => unsafe {
DictionaryArray::try_new_unchecked(
dtype,
cast(keys, &ArrowDataType::Int64)
.unwrap()
.as_any()
.downcast_ref::<PrimitiveArray<i64>>()
.unwrap()
.clone(),
Box::new(arr.clone()),
)
.unwrap()
},
RevMapping::Global(reverse_map, values, _uuid) => {
let iter = keys
.into_iter()
.map(|opt_k| opt_k.map(|k| *reverse_map.get(k).unwrap() as i64));
let keys = PrimitiveArray::from_trusted_len_iter(iter);

DictionaryArray::from_data(keys, Box::new(values.clone()))
// Safety:
// the keys are in bounds
unsafe {
DictionaryArray::try_new_unchecked(dtype, keys, Box::new(values.clone()))
.unwrap()
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ private = ["polars-time/private"]
[dependencies]
ahash = "0.7"
anyhow = "1.0"
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "f5f6b7e3aa10b80dc574abacf96b30e0927410fe", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "arity_assign", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "98e49133b2e56e51e30335830485b3cf768eb5a2", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "fix_cast_dict", default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", default-features = false }
csv-core = { version = "0.1.10", optional = true }
Expand Down
15 changes: 10 additions & 5 deletions polars/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,14 @@ impl<R: Read + Seek> IpcReader<R> {
metadata.schema.clone()
};

let reader = read::FileReader::new(&mut self.reader, metadata, sorted_projection);
let reader =
read::FileReader::new(&mut self.reader, metadata, sorted_projection, self.n_rows);

let include_row_count = self.row_count.is_some();
finish_reader(
reader,
rechunk,
self.n_rows,
None,
predicate,
aggregate,
&schema,
Expand Down Expand Up @@ -198,12 +199,16 @@ where
};

let include_row_count = self.row_count.is_some();
let ipc_reader =
read::FileReader::new(&mut self.reader, metadata.clone(), sorted_projection);
let ipc_reader = read::FileReader::new(
&mut self.reader,
metadata.clone(),
sorted_projection,
self.n_rows,
);
finish_reader(
ipc_reader,
rechunk,
self.n_rows,
None,
None,
None,
&schema,
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ pub use write::{BrotliLevel, CompressionOptions as ParquetCompression, GzipLevel

/// Write a DataFrame to parquet format
///
/// # Example
///
///
#[must_use]
pub struct ParquetWriter<W> {
writer: W,
Expand All @@ -44,6 +41,9 @@ where
}

/// Set the compression used. Defaults to `Lz4Raw`.
///
/// The default compression `Lz4Raw` has very good performance, but may not yet been supported
/// by older readers. If you want more compatability guarantees, consider using `Snappy`.
pub fn with_compression(mut self, compression: write::CompressionOptions) -> Self {
self.compression = compression;
self
Expand Down
2 changes: 1 addition & 1 deletion py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,9 @@ def write_parquet(
- "brotli"
- "lz4"
- "zstd"
The default compression "lz4" (actually lz4raw) has very good performance, but may not yet been supported
by older readers. If you want more compatability guarantees, consider using "snappy".
compression_level
Supported by {'gzip', 'brotli', 'zstd'}
- "gzip"
Expand Down

0 comments on commit 5b0e5e2

Please sign in to comment.