Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Migrated from Arc<dyn Array> to Box<dyn Array> #1042

Merged
merged 1 commit into from
Jun 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bench = false
[dependencies]
either = "1.6"
num-traits = "0.2"
dyn-clone = "1"
bytemuck = { version = "1", features = ["derive"] }
chrono = { version = "0.4", default_features = false, features = ["std"] }
chrono-tz = { version = "0.6", optional = true }
Expand Down
8 changes: 4 additions & 4 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::fs::File;
use std::{collections::HashMap, io::Read};

use arrow2::array::Array;
use arrow2::io::ipc::IpcField;
use arrow2::{
Expand All @@ -15,15 +18,12 @@ use arrow2::{
};
use clap::Parser;
use flate2::read::GzDecoder;
use std::fs::File;
use std::sync::Arc;
use std::{collections::HashMap, io::Read};

/// Read gzipped JSON file
pub fn read_gzip_json(
version: &str,
file_name: &str,
) -> Result<(Schema, Vec<IpcField>, Vec<Chunk<Arc<dyn Array>>>)> {
) -> Result<(Schema, Vec<IpcField>, Vec<Chunk<Box<dyn Array>>>)> {
let path = format!(
"../testing/arrow-testing/data/arrow-ipc-stream/integration/{}/{}.json.gz",
version, file_name
Expand Down
8 changes: 3 additions & 5 deletions arrow-pyarrow-integration-testing/src/c_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
let array = Int32Array::from(&[Some(2), None, Some(1), None]);
let array = StructArray::from_data(
DataType::Struct(vec![Field::new("a", array.data_type().clone(), true)]),
vec![Arc::new(array)],
vec![array.boxed()],
None,
);
)
.boxed();
// and a field with its datatype
let field = Field::new("a", array.data_type().clone(), true);

// Arc it, since it will be shared with an external program
let array: Arc<dyn Array> = Arc::new(array.clone());

// create an iterator of arrays
let arrays = vec![array.clone(), array.clone(), array];
let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _;
Expand Down
5 changes: 2 additions & 3 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ mod c_stream;

use std::error;
use std::fmt;
use std::sync::Arc;

use pyo3::exceptions::PyOSError;
use pyo3::ffi::Py_uintptr_t;
Expand Down Expand Up @@ -50,7 +49,7 @@ impl From<PyO3Error> for PyErr {
}
}

fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Box<dyn Array>> {
// prepare a pointer to receive the Array struct
let array = Box::new(ffi::ArrowArray::empty());
let schema = Box::new(ffi::ArrowSchema::empty());
Expand All @@ -73,7 +72,7 @@ fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
Ok(array.into())
}

fn to_py_array(array: Arc<dyn Array>, py: Python) -> PyResult<PyObject> {
fn to_py_array(array: Box<dyn Array>, py: Python) -> PyResult<PyObject> {
let array_ptr = Box::new(ffi::ArrowArray::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());

Expand Down
3 changes: 1 addition & 2 deletions benches/iter_list.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::iter::FromIterator;
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};

Expand Down Expand Up @@ -28,7 +27,7 @@ fn add_benchmark(c: &mut Criterion) {
let array = ListArray::<i32>::from_data(
data_type,
offsets.into(),
Arc::new(values),
Box::new(values),
Some(validity),
);

Expand Down
10 changes: 4 additions & 6 deletions benches/write_csv.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
Expand All @@ -8,9 +6,9 @@ use arrow2::error::Result;
use arrow2::io::csv::write;
use arrow2::util::bench_util::*;

type ChunkArc = Chunk<Arc<dyn Array>>;
type ChunkBox = Chunk<Box<dyn Array>>;

fn write_batch(columns: &ChunkArc) -> Result<()> {
fn write_batch(columns: &ChunkBox) -> Result<()> {
let mut writer = vec![];

assert_eq!(columns.arrays().len(), 1);
Expand All @@ -20,8 +18,8 @@ fn write_batch(columns: &ChunkArc) -> Result<()> {
write::write_chunk(&mut writer, columns, &options)
}

fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Chunk::new(vec![Arc::new(array)])
fn make_chunk(array: impl Array + 'static) -> Chunk<Box<dyn Array>> {
Chunk::new(vec![Box::new(array)])
}

fn add_benchmark(c: &mut Criterion) {
Expand Down
2 changes: 1 addition & 1 deletion benches/write_ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, cre
fn write(array: &dyn Array) -> Result<()> {
let field = Field::new("c1", array.data_type().clone(), true);
let schema = vec![field].into();
let columns = Chunk::try_new(vec![clone(array).into()])?;
let columns = Chunk::try_new(vec![clone(array)])?;

let writer = Cursor::new(vec![]);
let mut writer = FileWriter::try_new(writer, &schema, None, Default::default())?;
Expand Down
6 changes: 2 additions & 4 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::{clone, Array};
Expand All @@ -9,11 +7,11 @@ use arrow2::error::Result;
use arrow2::io::parquet::write::*;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

type ChunkArc = Chunk<Arc<dyn Array>>;
type ChunkBox = Chunk<Box<dyn Array>>;

fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
let schema = Schema::from(vec![Field::new("c1", array.data_type().clone(), true)]);
let columns: ChunkArc = Chunk::new(vec![clone(array).into()]);
let columns: ChunkBox = Chunk::new(vec![clone(array)]);

let options = WriteOptions {
write_statistics: false,
Expand Down
6 changes: 2 additions & 4 deletions examples/avro_read_async.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use futures::pin_mut;
use futures::StreamExt;
use tokio::fs::File;
Expand All @@ -19,8 +17,8 @@ async fn main() -> Result<()> {
let mut reader = File::open(file_path).await?.compat();

let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?;
let avro_schemas = Arc::new(avro_schemas);
let projection = Arc::new(schema.fields.iter().map(|_| true).collect::<Vec<_>>());
let avro_schemas = Box::new(avro_schemas);
let projection = Box::new(schema.fields.iter().map(|_| true).collect::<Vec<_>>());

let blocks = block_stream(&mut reader, marker).await;

Expand Down
20 changes: 20 additions & 0 deletions examples/cow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// This example demos how to operate on arrays in-place.
use arrow2::array::{Array, PrimitiveArray};

fn main() {
// say we have have received an array
let mut array: Box<dyn Array> = PrimitiveArray::from_vec(vec![1i32, 2]).boxed();

// we can apply a transformation to its values without allocating a new array as follows:
// 1. downcast it to the correct type (known via `array.data_type().to_physical_type()`)
let array = array
.as_any_mut()
.downcast_mut::<PrimitiveArray<i32>>()
.unwrap();

// 2. call `apply_values` with the function to apply over the values
array.apply_values(|x| x.iter_mut().for_each(|x| *x *= 10));

// confirm that it gives the right result :)
assert_eq!(array, &PrimitiveArray::from_vec(vec![10i32, 20]));
}
4 changes: 1 addition & 3 deletions examples/csv_read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::csv::read;

fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Arc<dyn Array>>> {
fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Box<dyn Array>>> {
// Create a CSV reader. This is typically created on the thread that reads the file and
// thus owns the read head.
let mut reader = read::ReaderBuilder::new().from_path(path)?;
Expand Down
5 changes: 2 additions & 3 deletions examples/csv_read_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crossbeam_channel::unbounded;

use std::sync::Arc;
use std::thread;
use std::time::SystemTime;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::{error::Result, io::csv::read};

fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
fn parallel_read(path: &str) -> Result<Vec<Chunk<Box<dyn Array>>>> {
let batch_size = 100;
let has_header = true;
let projection = None;
Expand All @@ -19,7 +18,7 @@ fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
let mut reader = read::ReaderBuilder::new().from_path(path)?;
let (fields, _) =
read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let fields = Arc::new(fields);
let fields = Box::new(fields);

let start = SystemTime::now();
// spawn a thread to produce `Vec<ByteRecords>` (IO bounded)
Expand Down
5 changes: 2 additions & 3 deletions examples/csv_write_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::io::Write;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
use std::thread;

use arrow2::{
Expand All @@ -11,7 +10,7 @@ use arrow2::{
io::csv::write,
};

fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()> {
fn parallel_write(path: &str, batches: [Chunk<Box<dyn Array>>; 2]) -> Result<()> {
let options = write::SerializeOptions::default();

// write a header
Expand Down Expand Up @@ -59,7 +58,7 @@ fn main() -> Result<()> {
Some(5),
Some(6),
]);
let columns = Chunk::new(vec![array.arced()]);
let columns = Chunk::new(vec![array.boxed()]);

parallel_write("example.csv", [columns.clone(), columns])
}
5 changes: 2 additions & 3 deletions examples/extension.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io::{Cursor, Seek, Write};
use std::sync::Arc;

use arrow2::array::*;
use arrow2::chunk::Chunk;
Expand Down Expand Up @@ -40,7 +39,7 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::new(writer, schema, None, options);

let batch = Chunk::try_new(vec![Arc::new(array) as Arc<dyn Array>])?;
let batch = Chunk::try_new(vec![Box::new(array) as Box<dyn Array>])?;

writer.start()?;
writer.write(&batch, None)?;
Expand All @@ -49,7 +48,7 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
Ok(writer.into_inner())
}

fn read_ipc(buf: &[u8]) -> Result<Chunk<Arc<dyn Array>>> {
fn read_ipc(buf: &[u8]) -> Result<Chunk<Box<dyn Array>>> {
let mut cursor = Cursor::new(buf);
let metadata = read::read_file_metadata(&mut cursor)?;
let mut reader = read::FileReader::new(cursor, metadata, None);
Expand Down
5 changes: 2 additions & 3 deletions examples/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use arrow2::array::{Array, PrimitiveArray};
use arrow2::datatypes::Field;
use arrow2::error::Result;
use arrow2::ffi;
use std::sync::Arc;

unsafe fn export(
array: Arc<dyn Array>,
array: Box<dyn Array>,
array_ptr: *mut ffi::ArrowArray,
schema_ptr: *mut ffi::ArrowSchema,
) {
Expand All @@ -22,7 +21,7 @@ unsafe fn import(array: Box<ffi::ArrowArray>, schema: &ffi::ArrowSchema) -> Resu

fn main() -> Result<()> {
// let's assume that we have an array:
let array = PrimitiveArray::<i32>::from([Some(1), None, Some(123)]).arced();
let array = PrimitiveArray::<i32>::from([Some(1), None, Some(123)]).boxed();

// the goal is to export this array and import it back via FFI.
// to import, we initialize the structs that will receive the data
Expand Down
5 changes: 2 additions & 3 deletions examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
Expand All @@ -9,7 +8,7 @@ use arrow2::io::ipc::read;
use arrow2::io::print;

/// Simplest way: read all record batches from the file. This can be used e.g. for random access.
fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Arc<dyn Array>>>)> {
fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Box<dyn Array>>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
Expand All @@ -25,7 +24,7 @@ fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Arc<dyn Array>>>)> {
}

/// Random access way: read a single record batch from the file. This can be used e.g. for random access.
fn read_batch(path: &str) -> Result<(Schema, Chunk<Arc<dyn Array>>)> {
fn read_batch(path: &str) -> Result<(Schema, Chunk<Box<dyn Array>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
Expand Down
5 changes: 2 additions & 3 deletions examples/ipc_file_write.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::array::{Array, Int32Array, Utf8Array};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::write;

fn write_batches(path: &str, schema: Schema, columns: &[Chunk<Arc<dyn Array>>]) -> Result<()> {
fn write_batches(path: &str, schema: Schema, columns: &[Chunk<Box<dyn Array>>]) -> Result<()> {
let file = File::create(path)?;

let options = write::WriteOptions { compression: None };
Expand Down Expand Up @@ -35,7 +34,7 @@ fn main() -> Result<()> {
let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]);
let b = Utf8Array::<i32>::from_slice(&["a", "b", "c", "d", "e"]);

let batch = Chunk::try_new(vec![a.arced(), b.arced()])?;
let batch = Chunk::try_new(vec![a.boxed(), b.boxed()])?;

// write it
write_batches(file_path, schema, &[batch])?;
Expand Down
3 changes: 1 addition & 2 deletions examples/json_read.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
/// Example of reading a JSON file.
use std::fs;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::error::Result;
use arrow2::io::json::read;

fn read_path(path: &str) -> Result<Arc<dyn Array>> {
fn read_path(path: &str) -> Result<Box<dyn Array>> {
// read the file into memory (IO-bounded)
let data = fs::read(path)?;

Expand Down
3 changes: 1 addition & 2 deletions examples/ndjson_read.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::fs::File;
use std::io::{BufReader, Seek};
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::error::Result;
use arrow2::io::ndjson::read;
use arrow2::io::ndjson::read::FallibleStreamingIterator;

fn read_path(path: &str) -> Result<Vec<Arc<dyn Array>>> {
fn read_path(path: &str) -> Result<Vec<Box<dyn Array>>> {
let batch_size = 1024; // number of rows per array
let mut reader = BufReader::new(File::open(path)?);

Expand Down
3 changes: 1 addition & 2 deletions examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::sync::Arc;
use std::time::SystemTime;

use futures::future::BoxFuture;
Expand All @@ -15,7 +14,7 @@ async fn main() -> Result<()> {

use std::env;
let args: Vec<String> = env::args().collect();
let file_path = Arc::new(args[1].clone());
let file_path = Box::new(args[1].clone());

// # Read metadata
let mut reader = BufReader::new(File::open(file_path.as_ref()).await?).compat();
Expand Down
Loading