Skip to content

Commit

Permalink
Use Arc in Datatype to reduce memory overhead (#3)
Browse files Browse the repository at this point in the history
See jorgecarleitao#1469

---------

Co-authored-by: Clement Rey <cr.rey.clement@gmail.com>
  • Loading branch information
emilk and teh-cmc authored Jan 15, 2024
1 parent 02ff918 commit 33a3200
Show file tree
Hide file tree
Showing 78 changed files with 996 additions and 542 deletions.
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/src/c_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
// initialize an array
let array = Int32Array::from(&[Some(2), None, Some(1), None]);
let array = StructArray::new(
DataType::Struct(vec![Field::new("a", array.data_type().clone(), true)]),
DataType::Struct(vec![Field::new("a", array.data_type().clone(), true)].into()),
vec![array.boxed()],
None,
)
Expand Down
3 changes: 2 additions & 1 deletion examples/csv_read_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crossbeam_channel::unbounded;

use std::sync::Arc;
use std::thread;
use std::time::SystemTime;

Expand All @@ -18,7 +19,7 @@ fn parallel_read(path: &str) -> Result<Vec<Chunk<Box<dyn Array>>>> {
let mut reader = read::ReaderBuilder::new().from_path(path)?;
let (fields, _) =
read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let fields = Box::new(fields);
let fields = Arc::new(fields);

let start = SystemTime::now();
// spawn a thread to produce `Vec<ByteRecords>` (IO bounded)
Expand Down
7 changes: 5 additions & 2 deletions examples/extension.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::io::{Cursor, Seek, Write};
use std::{
io::{Cursor, Seek, Write},
sync::Arc,
};

use re_arrow2::array::*;
use re_arrow2::chunk::Chunk;
Expand All @@ -10,7 +13,7 @@ use re_arrow2::io::ipc::write;
fn main() -> Result<()> {
// declare an extension.
let extension_type =
DataType::Extension("date16".to_string(), Box::new(DataType::UInt16), None);
DataType::Extension("date16".to_string(), Arc::new(DataType::UInt16), None);

// initialize an array with it.
let array = UInt16Array::from_slice([1, 2]).to(extension_type.clone());
Expand Down
5 changes: 2 additions & 3 deletions src/array/dictionary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::hash::Hash;
use std::hint::unreachable_unchecked;
use std::{hash::Hash, hint::unreachable_unchecked, sync::Arc};

use crate::{
bitmap::{
Expand Down Expand Up @@ -292,7 +291,7 @@ impl<K: DictionaryKey> DictionaryArray<K> {
}

pub(crate) fn default_data_type(values_datatype: DataType) -> DataType {
DataType::Dictionary(K::KEY_TYPE, Box::new(values_datatype), false)
DataType::Dictionary(K::KEY_TYPE, Arc::new(values_datatype), false)
}

/// Slices this [`DictionaryArray`].
Expand Down
2 changes: 1 addition & 1 deletion src/array/dictionary/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl<K: DictionaryKey, M: MutableArray> MutableDictionaryArray<K, M> {
fn from_value_map(value_map: ValueMap<K, M>) -> Self {
let keys = MutablePrimitiveArray::<K>::new();
let data_type =
DataType::Dictionary(K::KEY_TYPE, Box::new(value_map.data_type().clone()), false);
DataType::Dictionary(K::KEY_TYPE, Arc::new(value_map.data_type().clone()), false);
Self {
data_type,
map: value_map,
Expand Down
4 changes: 3 additions & 1 deletion src/array/fixed_size_list/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
Expand Down Expand Up @@ -203,7 +205,7 @@ impl FixedSizeListArray {

/// Returns a [`DataType`] consistent with [`FixedSizeListArray`].
pub fn default_datatype(data_type: DataType, size: usize) -> DataType {
let field = Box::new(Field::new("item", data_type, true));
let field = Arc::new(Field::new("item", data_type, true));
DataType::FixedSizeList(field, size)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/array/fixed_size_list/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<M: MutableArray> MutableFixedSizeListArray<M> {
/// Creates a new [`MutableFixedSizeListArray`] from a [`MutableArray`] and size.
pub fn new_with_field(values: M, name: &str, nullable: bool, size: usize) -> Self {
let data_type = DataType::FixedSizeList(
Box::new(Field::new(name, values.data_type().clone(), nullable)),
Arc::new(Field::new(name, values.data_type().clone(), nullable)),
size,
);
Self::new_from(values, data_type, size)
Expand Down
4 changes: 3 additions & 1 deletion src/array/list/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
Expand Down Expand Up @@ -188,7 +190,7 @@ impl<O: Offset> ListArray<O> {
impl<O: Offset> ListArray<O> {
/// Returns a default [`DataType`]: inner field is named "item" and is nullable
pub fn default_datatype(data_type: DataType) -> DataType {
let field = Box::new(Field::new("item", data_type, true));
let field = Arc::new(Field::new("item", data_type, true));
if O::IS_LARGE {
DataType::LargeList(field)
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/array/list/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl<O: Offset, M: MutableArray> MutableListArray<O, M> {

/// Creates a new [`MutableListArray`] from a [`MutableArray`].
pub fn new_with_field(values: M, name: &str, nullable: bool) -> Self {
let field = Box::new(Field::new(name, values.data_type().clone(), nullable));
let field = Arc::new(Field::new(name, values.data_type().clone(), nullable));
let data_type = if O::IS_LARGE {
DataType::LargeList(field)
} else {
Expand Down
6 changes: 4 additions & 2 deletions src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
Expand Down Expand Up @@ -28,7 +30,7 @@ pub use mutable::*;
/// Field::new("c", DataType::Int32, false),
/// ];
///
/// let array = StructArray::new(DataType::Struct(fields), vec![boolean, int], None);
/// let array = StructArray::new(DataType::Struct(std::sync::Arc::new(fields)), vec![boolean, int], None);
/// ```
#[derive(Clone)]
pub struct StructArray {
Expand Down Expand Up @@ -153,7 +155,7 @@ impl StructArray {
impl StructArray {
/// Deconstructs the [`StructArray`] into its individual components.
#[must_use]
pub fn into_data(self) -> (Vec<Field>, Vec<Box<dyn Array>>, Option<Bitmap>) {
pub fn into_data(self) -> (Arc<Vec<Field>>, Vec<Box<dyn Array>>, Option<Bitmap>) {
let Self {
data_type,
values,
Expand Down
4 changes: 2 additions & 2 deletions src/array/union/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl UnionArray {
.try_for_each(|(index, (data_type, child))| {
if data_type != child {
Err(Error::oos(format!(
"The children DataTypes of a UnionArray must equal the children data types.
"The children DataTypes of a UnionArray must equal the children data types.
However, the field {index} has data type {data_type:?} but the value has data type {child:?}"
)))
} else {
Expand Down Expand Up @@ -352,7 +352,7 @@ impl UnionArray {
fn try_get_all(data_type: &DataType) -> Result<UnionComponents, Error> {
match data_type.to_logical_type() {
DataType::Union(fields, ids, mode) => {
Ok((fields, ids.as_ref().map(|x| x.as_ref()), *mode))
Ok((fields, ids.as_ref().map(|x| x.as_slice()), *mode))
}
_ => Err(Error::oos(
"The UnionArray requires a logical type of DataType::Union",
Expand Down
8 changes: 4 additions & 4 deletions src/compute/arithmetics/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn create_scale(lhs: &DataType, rhs: &DataType) -> Result<f64> {
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// let duration = PrimitiveArray::from([Some(10i64), Some(20i64), None, Some(30i64)])
Expand All @@ -96,7 +96,7 @@ fn create_scale(lhs: &DataType, rhs: &DataType) -> Result<f64> {
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// assert_eq!(result, expected);
Expand Down Expand Up @@ -161,7 +161,7 @@ where
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// let duration = PrimitiveArray::from([Some(10i64), Some(20i64), None, Some(30i64)])
Expand All @@ -176,7 +176,7 @@ where
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// assert_eq!(result, expected);
Expand Down
4 changes: 2 additions & 2 deletions src/compute/cast/dictionary_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ where
} else {
let data_type = DataType::Dictionary(
K2::KEY_TYPE,
Box::new(values.data_type().clone()),
std::sync::Arc::new(values.data_type().clone()),
is_ordered,
);
// Safety: this is safe because given a type `T` that fits in a `usize`, casting it to type `P` either overflows or also fits in a `usize`
Expand All @@ -116,7 +116,7 @@ where
} else {
let data_type = DataType::Dictionary(
K2::KEY_TYPE,
Box::new(values.data_type().clone()),
std::sync::Arc::new(values.data_type().clone()),
is_ordered,
);
// some of the values may not fit in `usize` and thus this needs to be checked
Expand Down
2 changes: 1 addition & 1 deletion src/compute/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ fn cast_list_to_fixed_size_list<O: Offset>(
);
let new_values = cast(sliced_values.as_ref(), inner.data_type(), options)?;
Ok(FixedSizeListArray::new(
DataType::FixedSizeList(Box::new(inner.clone()), size),
DataType::FixedSizeList(std::sync::Arc::new(inner.clone()), size),
new_values,
list.validity().cloned(),
))
Expand Down
3 changes: 2 additions & 1 deletion src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::hash::Hash;
use std::sync::Arc;

use num_traits::{AsPrimitive, Float, ToPrimitive};

Expand Down Expand Up @@ -406,7 +407,7 @@ pub fn timestamp_to_timestamp(
from: &PrimitiveArray<i64>,
from_unit: TimeUnit,
to_unit: TimeUnit,
tz: &Option<String>,
tz: &Option<Arc<String>>,
) -> PrimitiveArray<i64> {
let from_size = time_unit_multiple(from_unit);
let to_size = time_unit_multiple(to_unit);
Expand Down
6 changes: 4 additions & 2 deletions src/compute/cast/utf8_to.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use chrono::Datelike;

use crate::{
Expand Down Expand Up @@ -127,7 +129,7 @@ pub fn utf8_to_naive_timestamp_ns<O: Offset>(from: &Utf8Array<O>) -> PrimitiveAr

pub(super) fn utf8_to_timestamp_ns_dyn<O: Offset>(
from: &dyn Array,
timezone: String,
timezone: Arc<String>,
) -> Result<Box<dyn Array>> {
let from = from.as_any().downcast_ref().unwrap();
utf8_to_timestamp_ns::<O>(from, timezone)
Expand All @@ -138,7 +140,7 @@ pub(super) fn utf8_to_timestamp_ns_dyn<O: Offset>(
/// [`crate::temporal_conversions::utf8_to_timestamp_ns`] applied for RFC3339 formatting
pub fn utf8_to_timestamp_ns<O: Offset>(
from: &Utf8Array<O>,
timezone: String,
timezone: Arc<String>,
) -> Result<PrimitiveArray<i64>> {
utf8_to_timestamp_ns_(from, RFC3339, timezone)
}
Expand Down
Loading

0 comments on commit 33a3200

Please sign in to comment.