diff --git a/.github/workflows/integration-ffi.yml b/.github/workflows/integration-ffi.yml index 8f0797eea12..3fce8f24fd8 100644 --- a/.github/workflows/integration-ffi.yml +++ b/.github/workflows/integration-ffi.yml @@ -38,6 +38,6 @@ jobs: python -m venv venv source venv/bin/activate - pip install maturin==0.10.2 toml==0.10.1 pyarrow==4.0.0 + pip install maturin==0.10.2 toml==0.10.1 pyarrow==5.0.0 maturin develop python -m unittest discover tests diff --git a/arrow-pyarrow-integration-testing/Cargo.toml b/arrow-pyarrow-integration-testing/Cargo.toml index 97a64a9c509..551868e30ea 100644 --- a/arrow-pyarrow-integration-testing/Cargo.toml +++ b/arrow-pyarrow-integration-testing/Cargo.toml @@ -17,13 +17,9 @@ [package] name = "arrow-pyarrow-integration-testing" -description = "" -version = "4.0.0-SNAPSHOT" -homepage = "https://github.com/apache/arrow-rs" -repository = "https://github.com/apache/arrow-rs" -authors = ["Apache Arrow "] +version = "0.0.0" +authors = ["Jorge C. Leitao ", "Apache Arrow "] license = "Apache-2.0" -keywords = [ "arrow" ] edition = "2018" [lib] @@ -31,8 +27,8 @@ name = "arrow_pyarrow_integration_testing" crate-type = ["cdylib"] [dependencies] -arrow2 = { path = "../", default-features = false, features = ["compute"] } -pyo3 = { version = "0.12.1", features = ["extension-module"] } +arrow2 = { path = "../", default-features = false } +pyo3 = { version = "0.14", features = ["extension-module"] } [package.metadata.maturin] requires-dist = ["pyarrow>=1"] diff --git a/arrow-pyarrow-integration-testing/src/lib.rs b/arrow-pyarrow-integration-testing/src/lib.rs index 666f433879e..0f6bdf8e2f7 100644 --- a/arrow-pyarrow-integration-testing/src/lib.rs +++ b/arrow-pyarrow-integration-testing/src/lib.rs @@ -1,20 +1,3 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - //! This library demonstrates a minimal usage of Rust's C data interface to pass //! arrays from and to Python. @@ -23,15 +6,11 @@ use std::fmt; use std::sync::Arc; use pyo3::exceptions::PyOSError; +use pyo3::ffi::Py_uintptr_t; +use pyo3::prelude::*; use pyo3::wrap_pyfunction; -use pyo3::{libc::uintptr_t, prelude::*}; - -use arrow2::array::{Array, Int64Array}; -use arrow2::ffi; -use arrow2::{array::PrimitiveArray, compute}; -use arrow2::{datatypes::DataType, error::ArrowError}; -type ArrayRef = Arc; +use arrow2::{array::Array, datatypes::Field, error::ArrowError, ffi}; /// an error that bridges ArrowError with a Python error #[derive(Debug)] @@ -70,130 +49,107 @@ impl From for PyErr { } } -fn to_rust(ob: PyObject, py: Python) -> PyResult { +fn to_rust_array(ob: PyObject, py: Python) -> PyResult> { // prepare a pointer to receive the Array struct - let array = Arc::new(ffi::create_empty()); - let (array_ptr, schema_ptr) = array.references(); + let array = Box::new(ffi::Ffi_ArrowArray::empty()); + let schema = Box::new(ffi::Ffi_ArrowSchema::empty()); + + let array_ptr = &*array as *const ffi::Ffi_ArrowArray; + let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema; // make the conversion through PyArrow's private API // this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds ob.call_method1( py, "_export_to_c", - (array_ptr as uintptr_t, schema_ptr as uintptr_t), + (array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t), )?; - Ok(ffi::try_from(array).map_err(PyO3ArrowError::from)?.into()) + let field = ffi::import_field_from_c(schema.as_ref()).map_err(PyO3ArrowError::from)?; + let array = ffi::import_array_from_c(array, &field).map_err(PyO3ArrowError::from)?; + + Ok(array.into()) } -fn to_py(array: ArrayRef, py: Python) -> PyResult { - let array_ptr = ffi::export_to_c(array).map_err(PyO3ArrowError::from)?; +fn to_py_array(array: Arc, py: Python) -> PyResult { + let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty()); + let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty()); + + let array_ptr = Box::into_raw(array_ptr); + let schema_ptr = Box::into_raw(schema_ptr); - let (array_ptr, schema_ptr) = array_ptr.references(); + unsafe { + ffi::export_field_to_c(&Field::new("", array.data_type().clone(), true), schema_ptr); + ffi::export_array_to_c(array, array_ptr); + }; let pa = py.import("pyarrow")?; let array = pa.getattr("Array")?.call_method1( "_import_from_c", - (array_ptr as uintptr_t, schema_ptr as uintptr_t), + (array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t), )?; - Ok(array.to_object(py)) -} + unsafe { + Box::from_raw(array_ptr); + Box::from_raw(schema_ptr); + }; -/// Returns `array + array` of an int64 array. -#[pyfunction] -fn double(array: PyObject, py: Python) -> PyResult { - // import - let array = to_rust(array, py)?; - - // perform some operation - let array = array.as_any().downcast_ref::().ok_or_else(|| { - PyO3ArrowError::ArrowError(ArrowError::Ffi("Expects an int64".to_string())) - })?; - let array = - compute::arithmetics::basic::add::add(&array, &array).map_err(PyO3ArrowError::from)?; - let array = Arc::new(array); - - // export - to_py(array, py) + Ok(array.to_object(py)) } -/// calls a lambda function that receives and returns an array -/// whose result must be the array multiplied by two -#[pyfunction] -fn double_py(lambda: PyObject, py: Python) -> PyResult { - // create - let array = Arc::new(PrimitiveArray::::from(vec![Some(1), None, Some(3)])); - let expected = Arc::new(PrimitiveArray::::from(vec![Some(2), None, Some(6)])) as ArrayRef; +fn to_rust_field(ob: PyObject, py: Python) -> PyResult { + // prepare a pointer to receive the Array struct + let schema = Box::new(ffi::Ffi_ArrowSchema::empty()); - // to py - let array = to_py(array, py)?; + let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema; - let array = lambda.call1(py, (array,))?; + // make the conversion through PyArrow's private API + // this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds + ob.call_method1(py, "_export_to_c", (schema_ptr as Py_uintptr_t,))?; - let array = to_rust(array, py)?; + let field = ffi::import_field_from_c(schema.as_ref()).map_err(PyO3ArrowError::from)?; - Ok(array == expected) + Ok(field) } -/// Returns the substring -#[pyfunction] -fn substring(array: PyObject, start: i64, py: Python) -> PyResult { - // import - let array = to_rust(array, py)?; +fn to_py_field(field: &Field, py: Python) -> PyResult { + let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty()); + let schema_ptr = Box::into_raw(schema_ptr); - // substring - let array = compute::substring::substring(array.as_ref(), start, &None) - .map_err(PyO3ArrowError::from)? - .into(); + unsafe { + ffi::export_field_to_c(field, schema_ptr); + }; - // export - to_py(array, py) -} + let pa = py.import("pyarrow")?; -/// Returns the concatenate -#[pyfunction] -fn concatenate(array: PyObject, py: Python) -> PyResult { - // import - let array = to_rust(array, py)?; + let array = pa + .getattr("Field")? + .call_method1("_import_from_c", (schema_ptr as Py_uintptr_t,))?; - // concat - let array = compute::concat::concatenate(&[array.as_ref(), array.as_ref()]) - .map_err(PyO3ArrowError::from)? - .into(); + unsafe { Box::from_raw(schema_ptr) }; - // export - to_py(array, py) + Ok(array.to_object(py)) } /// Converts to rust and back to python #[pyfunction] -fn round_trip(array: PyObject, py: Python) -> PyResult { +fn round_trip_array(array: PyObject, py: Python) -> PyResult { // import - let array = to_rust(array, py)?; + let array = to_rust_array(array, py)?; // export - to_py(array, py) + to_py_array(array, py) } /// Converts to rust and back to python #[pyfunction] -fn import_primitive(array: PyObject, py: Python) -> PyResult { - let array = to_rust(array, py)?; - let expected = Arc::new(PrimitiveArray::::from(vec![Some(2), None, Some(6)])) as ArrayRef; - - Ok(array == expected) -} - -/// Converts to rust and back to python -#[pyfunction] -fn export_primitive(py: Python) -> PyResult { - let array = Arc::new(PrimitiveArray::::from(vec![Some(2), None, Some(6)])) as ArrayRef; - - let array = to_py(array, py)?; +fn round_trip_field(array: PyObject, py: Python) -> PyResult { + // import + let field = to_rust_field(array, py)?; - Ok(array) + // export + to_py_field(&field, py) } #[pyfunction] @@ -203,13 +159,8 @@ fn total_allocated_bytes() -> PyResult { #[pymodule] fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> { - m.add_wrapped(wrap_pyfunction!(total_allocated_bytes))?; - m.add_wrapped(wrap_pyfunction!(export_primitive))?; - m.add_wrapped(wrap_pyfunction!(import_primitive))?; - m.add_wrapped(wrap_pyfunction!(double))?; - m.add_wrapped(wrap_pyfunction!(double_py))?; - m.add_wrapped(wrap_pyfunction!(substring))?; - m.add_wrapped(wrap_pyfunction!(concatenate))?; - m.add_wrapped(wrap_pyfunction!(round_trip))?; + m.add_function(wrap_pyfunction!(total_allocated_bytes, m)?)?; + m.add_function(wrap_pyfunction!(round_trip_array, m)?)?; + m.add_function(wrap_pyfunction!(round_trip_field, m)?)?; Ok(()) } diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index d891eb27b90..e28834b1da2 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -40,60 +40,12 @@ def tearDown(self): # No leak of C++ memory self.assertEqual(self.old_allocated_cpp, pyarrow.total_allocated_bytes()) - def test_primitive_python(self): - """ - Python -> Rust -> Python - """ - a = pyarrow.array([1, 2, 3]) - b = arrow_pyarrow_integration_testing.double(a) - self.assertEqual(b, pyarrow.array([2, 4, 6])) - - def test_primitive_rust(self): - """ - Rust -> Python -> Rust - """ - - def double(array): - array = array.to_pylist() - return pyarrow.array([x * 2 if x is not None else None for x in array]) - - is_correct = arrow_pyarrow_integration_testing.double_py(double) - self.assertTrue(is_correct) - - def test_import_primitive(self): - """ - Python -> Rust - """ - old_allocated = pyarrow.total_allocated_bytes() - - a = pyarrow.array([2, None, 6]) - - is_correct = arrow_pyarrow_integration_testing.import_primitive(a) - self.assertTrue(is_correct) - # No leak of C++ memory - del a - self.assertEqual(old_allocated, pyarrow.total_allocated_bytes()) - - def test_export_primitive(self): - """ - Python -> Rust - """ - old_allocated = pyarrow.total_allocated_bytes() - - expected = pyarrow.array([2, None, 6]) - - result = arrow_pyarrow_integration_testing.export_primitive() - self.assertEqual(expected, result) - # No leak of C++ memory - del expected - self.assertEqual(old_allocated, pyarrow.total_allocated_bytes()) - def test_string_roundtrip(self): """ Python -> Rust -> Python """ a = pyarrow.array(["a", None, "ccc"]) - b = arrow_pyarrow_integration_testing.round_trip(a) + b = arrow_pyarrow_integration_testing.round_trip_array(a) c = pyarrow.array(["a", None, "ccc"]) self.assertEqual(b, c) @@ -107,26 +59,9 @@ def test_decimal_roundtrip(self): None, ] a = pyarrow.array(data, pyarrow.decimal128(5, 2)) - b = arrow_pyarrow_integration_testing.round_trip(a) + b = arrow_pyarrow_integration_testing.round_trip_array(a) self.assertEqual(a, b) - def test_string_python(self): - """ - Python -> Rust -> Python - """ - a = pyarrow.array(["a", None, "ccc"]) - b = arrow_pyarrow_integration_testing.substring(a, 1) - self.assertEqual(b, pyarrow.array(["", None, "cc"])) - - def test_time32_python(self): - """ - Python -> Rust -> Python - """ - a = pyarrow.array([None, 1, 2], pyarrow.time32("s")) - b = arrow_pyarrow_integration_testing.concatenate(a) - expected = pyarrow.array([None, 1, 2] + [None, 1, 2], pyarrow.time32("s")) - self.assertEqual(b, expected) - def test_list_array(self): """ Python -> Rust -> Python @@ -135,7 +70,7 @@ def test_list_array(self): a = pyarrow.array( [[], None, [1, 2], [4, 5, 6]], pyarrow.list_(pyarrow.int64()) ) - b = arrow_pyarrow_integration_testing.round_trip(a) + b = arrow_pyarrow_integration_testing.round_trip_array(a) b.validate(full=True) assert a.to_pylist() == b.to_pylist() @@ -159,7 +94,7 @@ def test_struct_array(self): ], pyarrow.struct(fields), ) - b = arrow_pyarrow_integration_testing.round_trip(a) + b = arrow_pyarrow_integration_testing.round_trip_array(a) b.validate(full=True) assert a.to_pylist() == b.to_pylist() @@ -174,7 +109,7 @@ def test_list_list_array(self): [[None], None, [[1], [2]], [[4, 5], [6]]], pyarrow.list_(pyarrow.list_(pyarrow.int64())), ) - b = arrow_pyarrow_integration_testing.round_trip(a) + b = arrow_pyarrow_integration_testing.round_trip_array(a) b.validate(full=True) assert a.to_pylist() == b.to_pylist() @@ -188,7 +123,7 @@ def test_dict(self): ["a", "a", "b", None, "c"], pyarrow.dictionary(pyarrow.int64(), pyarrow.utf8()), ) - b = arrow_pyarrow_integration_testing.round_trip(a) + b = arrow_pyarrow_integration_testing.round_trip_array(a) b.validate(full=True) assert a.to_pylist() == b.to_pylist() @@ -205,7 +140,7 @@ def test_sparse_union(self): pyarrow.array([0, 1, 2, None, 0], pyarrow.int64()), ], ) - b = arrow_pyarrow_integration_testing.round_trip(a) + b = arrow_pyarrow_integration_testing.round_trip_array(a) b.validate(full=True) assert a.to_pylist() == b.to_pylist() @@ -223,8 +158,24 @@ def test_dense_union(self): pyarrow.array([0, 1, 2, None, 0], pyarrow.int64()), ], ) - b = arrow_pyarrow_integration_testing.round_trip(a) + b = arrow_pyarrow_integration_testing.round_trip_array(a) b.validate(full=True) assert a.to_pylist() == b.to_pylist() assert a.type == b.type + + def test_field(self): + field = pyarrow.field("aa", pyarrow.bool_()) + result = arrow_pyarrow_integration_testing.round_trip_field(field) + assert field == result + + def test_field_nested(self): + field = pyarrow.field("aa", pyarrow.list_(pyarrow.field("ab", pyarrow.bool_()))) + result = arrow_pyarrow_integration_testing.round_trip_field(field) + assert field == result + + def test_field_metadata(self): + field = pyarrow.field("aa", pyarrow.bool_(), {"a": "b"}) + result = arrow_pyarrow_integration_testing.round_trip_field(field) + assert field == result + assert field.metadata == result.metadata diff --git a/examples/ffi.rs b/examples/ffi.rs new file mode 100644 index 00000000000..172c3159b4a --- /dev/null +++ b/examples/ffi.rs @@ -0,0 +1,55 @@ +use arrow2::array::{Array, PrimitiveArray}; +use arrow2::datatypes::Field; +use arrow2::error::Result; +use arrow2::ffi; +use std::sync::Arc; + +unsafe fn export( + array: Arc, + array_ptr: *mut ffi::Ffi_ArrowArray, + schema_ptr: *mut ffi::Ffi_ArrowSchema, +) { + let field = Field::new("a", array.data_type().clone(), true); + ffi::export_array_to_c(array, array_ptr); + ffi::export_field_to_c(&field, schema_ptr); +} + +fn import( + array: Box, + schema: &ffi::Ffi_ArrowSchema, +) -> Result> { + let field = ffi::import_field_from_c(schema)?; + ffi::import_array_from_c(array, &field) +} + +fn main() -> Result<()> { + // let's assume that we have an array: + let array = Arc::new(PrimitiveArray::::from([Some(1), None, Some(123)])) as Arc; + + // the goal is to export this array and import it back via FFI. + // to import, we initialize the structs that will receive the data + let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty()); + let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty()); + + // since FFIs work in raw pointers, let's temporarily relinquish ownership so that producers + // can write into it in a thread-safe manner + let array_ptr = Box::into_raw(array_ptr); + let schema_ptr = Box::into_raw(schema_ptr); + + // this is where a producer (in this case also us ^_^) writes to the pointers' location. + // `array` here could be anything or not even be available, if this was e.g. from Python. + // Safety: we just allocated the pointers correctly. + unsafe { export(array.clone(), array_ptr, schema_ptr) }; + + // we can now take ownership back, since we are responsible for deallocating this memory. + // Safety: we just into_raw them. + let array_ptr = unsafe { Box::from_raw(array_ptr) }; + let schema_ptr = unsafe { Box::from_raw(schema_ptr) }; + + // and finally interpret the written memory into a new array. + let new_array = import(array_ptr, schema_ptr.as_ref())?; + + // which is equal to the exported array + assert_eq!(array.as_ref(), new_array.as_ref()); + Ok(()) +} diff --git a/guide/src/ffi.md b/guide/src/ffi.md index 2cf87b0b9ce..40d01984c38 100644 --- a/guide/src/ffi.md +++ b/guide/src/ffi.md @@ -5,58 +5,9 @@ has a specification, which allows languages to share data structures via foreign interfaces at zero cost (i.e. via pointers). This is known as the [C Data interface](https://arrow.apache.org/docs/format/CDataInterface.html). -This crate supports importing from and exporting to most of `DataType`s. -Types currently not supported: - -* `FixedSizeBinary` -* `Union` -* `Dictionary` -* `FixedSizeList` -* `Null` - -## Export - -The API to export an `Array` is as follows: - -```rust -use std::sync::Arc; -use arrow2::array::{Array, PrimitiveArray}; -use arrow2::datatypes::DataType; -use arrow2::ffi::ArrowArray; - -# fn main() { -// Example of an array: -let array = [Some(1), None, Some(123)] - .iter() - .collect::>(); - -// export the array. -let ffi_array = ffi::export_to_c(Arc::new(array))?; - -// these are mutable pointers to `ArrowArray` and `ArrowSchema` of the C data interface -let (array_ptr, schema_ptr) = ffi_array.references(); -# } -``` - -## Import - -The API to import works similarly: +This crate supports importing from and exporting to all `DataType`s. Follow the +example below to learn how to import and export: ```rust -use arrow2::array::Array; -use arrow2::ffi; - -let array = Arc::new(ffi::create_empty()); - -// non-owned mutable pointers. -let (array_ptr, schema_ptr) = array.references(); - -// write to the pointers using any C data interface exporter - -// consume it to a `Box` -let array = ffi::try_from(array)?; +{{#include ../../examples/ffi.rs}} ``` - -This assumes that the exporter writes to `array_ptr` and `schema_ptr` -according to the c data interface. This is an intrinsically `unsafe` operation. -Failing to do so results in UB. diff --git a/src/array/binary/ffi.rs b/src/array/binary/ffi.rs index a216be7b421..63d0ea66f36 100644 --- a/src/array/binary/ffi.rs +++ b/src/array/binary/ffi.rs @@ -25,7 +25,7 @@ unsafe impl ToFfi for BinaryArray { unsafe impl FromFfi for BinaryArray { fn try_from_ffi(array: A) -> Result { - let data_type = array.field()?.data_type().clone(); + let data_type = array.field().data_type().clone(); let expected = if O::is_large() { DataType::LargeBinary } else { diff --git a/src/array/boolean/ffi.rs b/src/array/boolean/ffi.rs index b141c3a3a69..9dedfbc41de 100644 --- a/src/array/boolean/ffi.rs +++ b/src/array/boolean/ffi.rs @@ -23,7 +23,7 @@ unsafe impl ToFfi for BooleanArray { unsafe impl FromFfi for BooleanArray { fn try_from_ffi(array: A) -> Result { - let data_type = array.field()?.data_type().clone(); + let data_type = array.field().data_type().clone(); assert_eq!(data_type, DataType::Boolean); let length = array.array().len(); let offset = array.array().offset(); diff --git a/src/array/list/ffi.rs b/src/array/list/ffi.rs index 63d6e8f5863..97bea8cbc32 100644 --- a/src/array/list/ffi.rs +++ b/src/array/list/ffi.rs @@ -24,7 +24,7 @@ unsafe impl ToFfi for ListArray { unsafe impl FromFfi for ListArray { fn try_from_ffi(array: A) -> Result { - let data_type = array.field()?.data_type().clone(); + let data_type = array.field().data_type().clone(); let length = array.array().len(); let offset = array.array().offset(); let mut validity = unsafe { array.validity() }?; diff --git a/src/array/primitive/ffi.rs b/src/array/primitive/ffi.rs index 5168f322b70..1c3f534970d 100644 --- a/src/array/primitive/ffi.rs +++ b/src/array/primitive/ffi.rs @@ -24,7 +24,7 @@ unsafe impl ToFfi for PrimitiveArray { unsafe impl FromFfi for PrimitiveArray { fn try_from_ffi(array: A) -> Result { - let data_type = array.field()?.data_type().clone(); + let data_type = array.field().data_type().clone(); let length = array.array().len(); let offset = array.array().offset(); let mut validity = unsafe { array.validity() }?; diff --git a/src/array/struct_.rs b/src/array/struct_.rs index 9a6191a5d47..41763c464ee 100644 --- a/src/array/struct_.rs +++ b/src/array/struct_.rs @@ -153,7 +153,7 @@ unsafe impl ToFfi for StructArray { unsafe impl FromFfi for StructArray { fn try_from_ffi(array: A) -> Result { - let field = array.field()?; + let field = array.field(); let fields = Self::get_fields(field.data_type()).to_vec(); let length = array.array().len(); diff --git a/src/array/union/ffi.rs b/src/array/union/ffi.rs index 7028480e049..f07b7092eab 100644 --- a/src/array/union/ffi.rs +++ b/src/array/union/ffi.rs @@ -29,7 +29,7 @@ unsafe impl ToFfi for UnionArray { unsafe impl FromFfi for UnionArray { fn try_from_ffi(array: A) -> Result { - let field = array.field()?; + let field = array.field(); let data_type = field.data_type().clone(); let fields = Self::get_fields(field.data_type()); diff --git a/src/ffi/array.rs b/src/ffi/array.rs index 8f947d1b1c6..833657bc272 100644 --- a/src/ffi/array.rs +++ b/src/ffi/array.rs @@ -32,8 +32,7 @@ use crate::{ /// * the data type is not supported /// * the interface is not valid (e.g. a null pointer) pub fn try_from(array: A) -> Result> { - let field = array.field()?; - let array: Box = match field.data_type() { + let array: Box = match array.field().data_type() { DataType::Boolean => Box::new(BooleanArray::try_from_ffi(array)?), DataType::Int8 => Box::new(PrimitiveArray::::try_from_ffi(array)?), DataType::Int16 => Box::new(PrimitiveArray::::try_from_ffi(array)?), diff --git a/src/ffi/ffi.rs b/src/ffi/ffi.rs index 756b0b04da8..cbb08f2fa6e 100644 --- a/src/ffi/ffi.rs +++ b/src/ffi/ffi.rs @@ -17,7 +17,6 @@ use std::{ptr::NonNull, sync::Arc}; -use super::schema::{to_field, Ffi_ArrowSchema}; use crate::{ array::{buffers_children_dictionary, Array}, bitmap::{utils::bytes_for, Bitmap}, @@ -27,6 +26,7 @@ use crate::{ }, datatypes::{DataType, Field}, error::{ArrowError, Result}, + ffi::schema::get_field_child, types::NativeType, }; @@ -95,7 +95,7 @@ impl Ffi_ArrowArray { /// # Safety /// This method releases `buffers`. Consumers of this struct *must* call `release` before /// releasing this struct, or contents in `buffers` leak. - fn new(array: Arc) -> Self { + pub(crate) fn new(array: Arc) -> Self { let (buffers, children, dictionary) = buffers_children_dictionary(array.as_ref()); let buffers_ptr = buffers @@ -142,7 +142,7 @@ impl Ffi_ArrowArray { } // create an empty `Ffi_ArrowArray`, which can be used to import data into - fn empty() -> Self { + pub fn empty() -> Self { Self { length: 0, null_count: 0, @@ -158,22 +158,17 @@ impl Ffi_ArrowArray { } /// the length of the array - pub fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { self.length as usize } - /// whether the array is empty - pub fn is_empty(&self) -> bool { - self.length == 0 - } - /// the offset of the array - pub fn offset(&self) -> usize { + pub(crate) fn offset(&self) -> usize { self.offset as usize } /// the null count of the array - pub fn null_count(&self) -> usize { + pub(crate) fn null_count(&self) -> usize { self.null_count as usize } } @@ -283,31 +278,32 @@ fn buffer_len(array: &Ffi_ArrowArray, data_type: &DataType, i: usize) -> Result< fn create_child( array: &Ffi_ArrowArray, - schema: &Ffi_ArrowSchema, + field: &Field, parent: Arc, index: usize, ) -> Result> { + let field = get_field_child(field, index)?; assert!(index < array.n_children as usize); assert!(!array.children.is_null()); unsafe { let arr_ptr = *array.children.add(index); - let schema_ptr = schema.child(index); assert!(!arr_ptr.is_null()); let arr_ptr = &*arr_ptr; - Ok(ArrowArrayChild::from_raw(arr_ptr, schema_ptr, parent)) + + Ok(ArrowArrayChild::from_raw(arr_ptr, field, parent)) } } fn create_dictionary( array: &Ffi_ArrowArray, - schema: &Ffi_ArrowSchema, + field: &Field, parent: Arc, ) -> Result>> { - let schema = schema.dictionary(); - if let Some(schema) = schema { + if let DataType::Dictionary(_, values) = field.data_type() { + let field = Field::new("", values.as_ref().clone(), true); assert!(!array.dictionary.is_null()); let array = unsafe { &*array.dictionary }; - Ok(Some(ArrowArrayChild::from_raw(array, schema, parent))) + Ok(Some(ArrowArrayChild::from_raw(array, field, parent))) } else { Ok(None) } @@ -339,7 +335,7 @@ pub trait ArrowArrayRef { // +1 to ignore null bitmap create_buffer::( self.array(), - self.field()?.data_type(), + self.field().data_type(), self.deallocation(), index + 1, ) @@ -354,17 +350,16 @@ pub trait ArrowArrayRef { } fn child(&self, index: usize) -> Result { - create_child(self.array(), self.schema(), self.parent().clone(), index) + create_child(self.array(), self.field(), self.parent().clone(), index) } fn dictionary(&self) -> Result> { - create_dictionary(self.array(), self.schema(), self.parent().clone()) + create_dictionary(self.array(), self.field(), self.parent().clone()) } fn parent(&self) -> &Arc; fn array(&self) -> &Ffi_ArrowArray; - fn schema(&self) -> &Ffi_ArrowSchema; - fn field(&self) -> Result; + fn field(&self) -> &Field; } /// Struct used to move an Array from and to the C Data Interface. @@ -388,14 +383,20 @@ pub trait ArrowArrayRef { /// Furthermore, this struct assumes that the incoming data agrees with the C data interface. #[derive(Debug)] pub struct ArrowArray { - array: Arc, - schema: Arc, + array: Box, + field: Field, +} + +impl ArrowArray { + pub fn new(array: Box, field: Field) -> Self { + Self { array, field } + } } impl ArrowArrayRef for Arc { /// the data_type as declared in the schema - fn field(&self) -> Result { - to_field(&self.schema) + fn field(&self) -> &Field { + &self.field } fn parent(&self) -> &Arc { @@ -405,23 +406,19 @@ impl ArrowArrayRef for Arc { fn array(&self) -> &Ffi_ArrowArray { self.array.as_ref() } - - fn schema(&self) -> &Ffi_ArrowSchema { - self.schema.as_ref() - } } #[derive(Debug)] pub struct ArrowArrayChild<'a> { array: &'a Ffi_ArrowArray, - schema: &'a Ffi_ArrowSchema, + field: Field, parent: Arc, } impl<'a> ArrowArrayRef for ArrowArrayChild<'a> { /// the data_type as declared in the schema - fn field(&self) -> Result { - to_field(self.schema) + fn field(&self) -> &Field { + &self.field } fn parent(&self) -> &Arc { @@ -431,48 +428,14 @@ impl<'a> ArrowArrayRef for ArrowArrayChild<'a> { fn array(&self) -> &Ffi_ArrowArray { self.array } - - fn schema(&self) -> &Ffi_ArrowSchema { - self.schema - } } impl<'a> ArrowArrayChild<'a> { - fn from_raw( - array: &'a Ffi_ArrowArray, - schema: &'a Ffi_ArrowSchema, - parent: Arc, - ) -> Self { + fn from_raw(array: &'a Ffi_ArrowArray, field: Field, parent: Arc) -> Self { Self { array, - schema, + field, parent, } } } - -/// Exports an `Array` to the C data interface. -pub fn export_to_c(array: Arc) -> Result { - let field = Field::new("", array.data_type().clone(), array.null_count() != 0); - - Ok(ArrowArray { - array: Arc::new(Ffi_ArrowArray::new(array)), - schema: Arc::new(Ffi_ArrowSchema::try_new(field)?), - }) -} - -pub fn create_empty() -> ArrowArray { - ArrowArray { - array: Arc::new(Ffi_ArrowArray::empty()), - schema: Arc::new(Ffi_ArrowSchema::empty()), - } -} - -impl ArrowArray { - pub fn references(&self) -> (*mut Ffi_ArrowArray, *mut Ffi_ArrowSchema) { - ( - self.array.as_ref() as *const Ffi_ArrowArray as *mut Ffi_ArrowArray, - self.schema.as_ref() as *const Ffi_ArrowSchema as *mut Ffi_ArrowSchema, - ) - } -} diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 19fa83a8fbf..15504487c94 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -5,5 +5,40 @@ mod array; mod ffi; mod schema; -pub use array::try_from; -pub use ffi::{create_empty, export_to_c, ArrowArray, ArrowArrayRef}; +pub(crate) use array::try_from; +pub(crate) use ffi::{ArrowArray, ArrowArrayRef}; + +use std::sync::Arc; + +use crate::array::Array; +use crate::datatypes::Field; +use crate::error::Result; + +pub use ffi::Ffi_ArrowArray; +pub use schema::Ffi_ArrowSchema; + +use self::schema::to_field; + +/// Exports an `Array` to the C data interface. +/// # Safety +/// The pointer must be allocated and valid +pub unsafe fn export_array_to_c(array: Arc, ptr: *mut Ffi_ArrowArray) { + *ptr = Ffi_ArrowArray::new(array); +} + +/// Exports a [`Field`] to the C data interface. +/// # Safety +/// The pointer must be allocated and valid +pub unsafe fn export_field_to_c(field: &Field, ptr: *mut Ffi_ArrowSchema) { + *ptr = Ffi_ArrowSchema::new(field) +} + +/// Imports a [`Field`] from the C data interface. +pub fn import_field_from_c(field: &Ffi_ArrowSchema) -> Result { + to_field(field) +} + +/// Imports a [`Field`] from the C data interface. +pub fn import_array_from_c(array: Box, field: &Field) -> Result> { + try_from(Arc::new(ArrowArray::new(array, field.clone()))) +} diff --git a/src/ffi/schema.rs b/src/ffi/schema.rs index 926871af11e..75a06650004 100644 --- a/src/ffi/schema.rs +++ b/src/ffi/schema.rs @@ -7,14 +7,13 @@ use crate::{ #[allow(dead_code)] struct SchemaPrivateData { - field: Field, children_ptr: Box<[*mut Ffi_ArrowSchema]>, dictionary: Option<*mut Ffi_ArrowSchema>, } /// ABI-compatible struct for `ArrowSchema` from C Data Interface /// See -/// This was created by bindgen +// This was created by bindgen #[repr(C)] #[derive(Debug)] pub struct Ffi_ArrowSchema { @@ -52,27 +51,27 @@ unsafe extern "C" fn c_release_schema(schema: *mut Ffi_ArrowSchema) { } impl Ffi_ArrowSchema { - /// create a new [`Ffi_ArrowSchema`]. This fails if the fields' [`DataType`] is not supported. - pub fn try_new(field: Field) -> Result { - let format = to_format(field.data_type())?; + /// creates a new [Ffi_ArrowSchema] + pub(crate) fn new(field: &Field) -> Self { + let format = to_format(field.data_type()); let name = field.name().clone(); // allocate (and hold) the children let children_vec = match field.data_type() { DataType::List(field) => { - vec![Box::new(Ffi_ArrowSchema::try_new(field.as_ref().clone())?)] + vec![Box::new(Ffi_ArrowSchema::new(field.as_ref()))] } DataType::LargeList(field) => { - vec![Box::new(Ffi_ArrowSchema::try_new(field.as_ref().clone())?)] + vec![Box::new(Ffi_ArrowSchema::new(field.as_ref()))] } DataType::Struct(fields) => fields .iter() - .map(|field| Ok(Box::new(Ffi_ArrowSchema::try_new(field.clone())?))) - .collect::>>()?, + .map(|field| Box::new(Ffi_ArrowSchema::new(field))) + .collect::>(), DataType::Union(fields, _, _) => fields .iter() - .map(|field| Ok(Box::new(Ffi_ArrowSchema::try_new(field.clone())?))) - .collect::>>()?, + .map(|field| Box::new(Ffi_ArrowSchema::new(field))) + .collect::>(), _ => vec![], }; // note: this cannot be done along with the above because the above is fallible and this op leaks. @@ -86,20 +85,19 @@ impl Ffi_ArrowSchema { let dictionary = if let DataType::Dictionary(_, values) = field.data_type() { // we do not store field info in the dict values, so can't recover it all :( - let field = Field::new("item", values.as_ref().clone(), true); - Some(Box::new(Ffi_ArrowSchema::try_new(field)?)) + let field = Field::new("", values.as_ref().clone(), true); + Some(Box::new(Ffi_ArrowSchema::new(&field))) } else { None }; let mut private = Box::new(SchemaPrivateData { - field, children_ptr, dictionary: dictionary.map(Box::into_raw), }); // - Ok(Ffi_ArrowSchema { + Self { format: CString::new(format).unwrap().into_raw(), name: CString::new(name).unwrap().into_raw(), metadata: std::ptr::null_mut(), @@ -109,7 +107,7 @@ impl Ffi_ArrowSchema { dictionary: private.dictionary.unwrap_or(std::ptr::null_mut()), release: Some(c_release_schema), private_data: Box::into_raw(private) as *mut ::std::os::raw::c_void, - }) + } } /// create an empty [Ffi_ArrowSchema] @@ -128,7 +126,7 @@ impl Ffi_ArrowSchema { } /// returns the format of this schema. - pub fn format(&self) -> &str { + pub(crate) fn format(&self) -> &str { assert!(!self.format.is_null()); // safe because the lifetime of `self.format` equals `self` unsafe { CStr::from_ptr(self.format) } @@ -137,26 +135,26 @@ impl Ffi_ArrowSchema { } /// returns the name of this schema. - pub fn name(&self) -> &str { + pub(crate) fn name(&self) -> &str { assert!(!self.name.is_null()); // safe because the lifetime of `self.name` equals `self` unsafe { CStr::from_ptr(self.name) }.to_str().unwrap() } - pub fn child(&self, index: usize) -> &'static Self { + pub(crate) fn child(&self, index: usize) -> &'static Self { assert!(index < self.n_children as usize); assert!(!self.name.is_null()); unsafe { self.children.add(index).as_ref().unwrap().as_ref().unwrap() } } - pub fn dictionary(&self) -> Option<&'static Self> { + pub(crate) fn dictionary(&self) -> Option<&'static Self> { if self.dictionary.is_null() { return None; }; Some(unsafe { self.dictionary.as_ref().unwrap() }) } - pub fn nullable(&self) -> bool { + pub(crate) fn nullable(&self) -> bool { (self.flags / 2) & 1 == 1 } } @@ -174,8 +172,11 @@ pub fn to_field(schema: &Ffi_ArrowSchema) -> Result { let dictionary = schema.dictionary(); let data_type = if let Some(dictionary) = dictionary { let indices_data_type = to_data_type(schema)?; - let values_data_type = to_data_type(dictionary)?; - DataType::Dictionary(Box::new(indices_data_type), Box::new(values_data_type)) + let values = to_field(dictionary)?; + DataType::Dictionary( + Box::new(indices_data_type), + Box::new(values.data_type().clone()), + ) } else { to_data_type(schema)? }; @@ -285,56 +286,62 @@ fn to_data_type(schema: &Ffi_ArrowSchema) -> Result { } /// the inverse of [to_field] -fn to_format(data_type: &DataType) -> Result { - Ok(match data_type { - DataType::Null => "n", - DataType::Boolean => "b", - DataType::Int8 => "c", - DataType::UInt8 => "C", - DataType::Int16 => "s", - DataType::UInt16 => "S", - DataType::Int32 => "i", - DataType::UInt32 => "I", - DataType::Int64 => "l", - DataType::UInt64 => "L", - DataType::Float16 => "e", - DataType::Float32 => "f", - DataType::Float64 => "g", - DataType::Binary => "z", - DataType::LargeBinary => "Z", - DataType::Utf8 => "u", - DataType::LargeUtf8 => "U", - DataType::Date32 => "tdD", - DataType::Date64 => "tdm", - DataType::Time32(TimeUnit::Second) => "tts", - DataType::Time32(TimeUnit::Millisecond) => "ttm", - DataType::Time64(TimeUnit::Microsecond) => "ttu", - DataType::Time64(TimeUnit::Nanosecond) => "ttn", - DataType::Duration(TimeUnit::Second) => "tDs", - DataType::Duration(TimeUnit::Millisecond) => "tDm", - DataType::Duration(TimeUnit::Microsecond) => "tDu", - DataType::Duration(TimeUnit::Nanosecond) => "tDn", - DataType::Interval(IntervalUnit::YearMonth) => "tiM", - DataType::Interval(IntervalUnit::DayTime) => "tiD", +fn to_format(data_type: &DataType) -> String { + match data_type { + DataType::Null => "n".to_string(), + DataType::Boolean => "b".to_string(), + DataType::Int8 => "c".to_string(), + DataType::UInt8 => "C".to_string(), + DataType::Int16 => "s".to_string(), + DataType::UInt16 => "S".to_string(), + DataType::Int32 => "i".to_string(), + DataType::UInt32 => "I".to_string(), + DataType::Int64 => "l".to_string(), + DataType::UInt64 => "L".to_string(), + DataType::Float16 => "e".to_string(), + DataType::Float32 => "f".to_string(), + DataType::Float64 => "g".to_string(), + DataType::Binary => "z".to_string(), + DataType::LargeBinary => "Z".to_string(), + DataType::Utf8 => "u".to_string(), + DataType::LargeUtf8 => "U".to_string(), + DataType::Date32 => "tdD".to_string(), + DataType::Date64 => "tdm".to_string(), + DataType::Time32(TimeUnit::Second) => "tts".to_string(), + DataType::Time32(TimeUnit::Millisecond) => "ttm".to_string(), + DataType::Time32(_) => { + unreachable!("Time32 is only supported for seconds and milliseconds") + } + DataType::Time64(TimeUnit::Microsecond) => "ttu".to_string(), + DataType::Time64(TimeUnit::Nanosecond) => "ttn".to_string(), + DataType::Time64(_) => { + unreachable!("Time64 is only supported for micro and nanoseconds") + } + DataType::Duration(TimeUnit::Second) => "tDs".to_string(), + DataType::Duration(TimeUnit::Millisecond) => "tDm".to_string(), + DataType::Duration(TimeUnit::Microsecond) => "tDu".to_string(), + DataType::Duration(TimeUnit::Nanosecond) => "tDn".to_string(), + DataType::Interval(IntervalUnit::YearMonth) => "tiM".to_string(), + DataType::Interval(IntervalUnit::DayTime) => "tiD".to_string(), DataType::Timestamp(unit, tz) => { let unit = match unit { - TimeUnit::Second => "s", - TimeUnit::Millisecond => "m", - TimeUnit::Microsecond => "u", - TimeUnit::Nanosecond => "n", + TimeUnit::Second => "s".to_string(), + TimeUnit::Millisecond => "m".to_string(), + TimeUnit::Microsecond => "u".to_string(), + TimeUnit::Nanosecond => "n".to_string(), }; - return Ok(format!( + format!( "ts{}:{}", unit, tz.as_ref().map(|x| x.as_ref()).unwrap_or("") - )); + ) } - DataType::Decimal(precision, scale) => return Ok(format!("d:{},{}", precision, scale)), - DataType::List(_) => "+l", - DataType::LargeList(_) => "+L", - DataType::Struct(_) => "+s", - DataType::FixedSizeBinary(size) => return Ok(format!("w{}", size)), - DataType::FixedSizeList(_, size) => return Ok(format!("+w:{}", size)), + DataType::Decimal(precision, scale) => format!("d:{},{}", precision, scale), + DataType::List(_) => "+l".to_string(), + DataType::LargeList(_) => "+L".to_string(), + DataType::Struct(_) => "+s".to_string(), + DataType::FixedSizeBinary(size) => format!("w{}", size), + DataType::FixedSizeList(_, size) => format!("+w:{}", size), DataType::Union(f, ids, is_sparse) => { let sparsness = if *is_sparse { 's' } else { 'd' }; let mut r = format!("+u{}:", sparsness); @@ -346,10 +353,21 @@ fn to_format(data_type: &DataType) -> Result { }; let ids = &ids[..ids.len() - 1]; // take away last "," r.push_str(ids); - return Ok(r); + r } - DataType::Dictionary(index, _) => return to_format(index.as_ref()), - _ => todo!(), + DataType::Dictionary(index, _) => to_format(index.as_ref()), + } +} + +pub(super) fn get_field_child(field: &Field, index: usize) -> Result { + match (index, field.data_type()) { + (0, DataType::List(field)) => Ok(field.as_ref().clone()), + (0, DataType::LargeList(field)) => Ok(field.as_ref().clone()), + (index, DataType::Struct(fields)) => Ok(fields[index].clone()), + (index, DataType::Union(fields, _, _)) => Ok(fields[index].clone()), + (child, data_type) => Err(ArrowError::Ffi(format!( + "Requested child {} to type {:?} that has no such child", + child, data_type + ))), } - .to_string()) } diff --git a/tests/it/ffi.rs b/tests/it/ffi.rs index fba1a179861..d76cf5b32b3 100644 --- a/tests/it/ffi.rs +++ b/tests/it/ffi.rs @@ -1,38 +1,56 @@ use arrow2::array::*; -use arrow2::datatypes::{DataType, TimeUnit}; -use arrow2::ffi::try_from; +use arrow2::datatypes::{DataType, Field, TimeUnit}; use arrow2::{error::Result, ffi}; use std::sync::Arc; -fn test_release(expected: impl Array + 'static) -> Result<()> { - // create a `ArrowArray` from the data. - let b: Arc = Arc::new(expected); +fn test_round_trip(expected: impl Array + Clone + 'static) -> Result<()> { + let array: Arc = Arc::new(expected.clone()); + let field = Field::new("a", array.data_type().clone(), true); + let expected = Box::new(expected) as Box; + + let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty()); + let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty()); - // export the array as 2 pointers. - let _ = ffi::export_to_c(b)?; + let array_ptr = Box::into_raw(array_ptr); + let schema_ptr = Box::into_raw(schema_ptr); + unsafe { + ffi::export_array_to_c(array, array_ptr); + ffi::export_field_to_c(&field, schema_ptr); + } + + let array_ptr = unsafe { Box::from_raw(array_ptr) }; + let schema_ptr = unsafe { Box::from_raw(schema_ptr) }; + + // import references + let result_field = ffi::import_field_from_c(schema_ptr.as_ref())?; + let result_array = ffi::import_array_from_c(array_ptr, &result_field)?; + + assert_eq!(&result_array, &expected); + assert_eq!(result_field, field); Ok(()) } -fn test_round_trip(expected: impl Array + Clone + 'static) -> Result<()> { - let b: Arc = Arc::new(expected.clone()); - let expected = Box::new(expected) as Box; - +fn test_round_trip_schema(field: Field) -> Result<()> { // create a `ArrowArray` from the data. - let array = Arc::new(ffi::export_to_c(b)?); + let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty()); + + let schema_ptr = Box::into_raw(schema_ptr); + + unsafe { ffi::export_field_to_c(&field, schema_ptr) }; - let (_, _) = array.references(); + let schema_ptr = unsafe { Box::from_raw(schema_ptr) }; - let result = try_from(array)?; + let result = ffi::import_field_from_c(schema_ptr.as_ref())?; - assert_eq!(&result, &expected); + assert_eq!(result, field); Ok(()) } #[test] fn u32() -> Result<()> { let data = Int32Array::from(&[Some(2), None, Some(1), None]); - test_release(data) + test_round_trip(data) } #[test] @@ -130,3 +148,20 @@ fn dict() -> Result<()> { test_round_trip(array) } + +#[test] +fn schema() -> Result<()> { + let field = Field::new( + "a", + DataType::List(Box::new(Field::new("a", DataType::UInt32, true))), + true, + ); + test_round_trip_schema(field)?; + + let field = Field::new( + "a", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + true, + ); + test_round_trip_schema(field) +}