Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added API to FFI Field #321

Merged
merged 5 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-ffi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ jobs:
python -m venv venv
source venv/bin/activate

pip install maturin==0.10.2 toml==0.10.1 pyarrow==4.0.0
pip install maturin==0.10.2 toml==0.10.1 pyarrow==5.0.0
maturin develop
python -m unittest discover tests
12 changes: 4 additions & 8 deletions arrow-pyarrow-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@

[package]
name = "arrow-pyarrow-integration-testing"
description = ""
version = "4.0.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow-rs"
repository = "https://github.com/apache/arrow-rs"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
version = "0.0.0"
authors = ["Jorge C. Leitao <jorgecarleitao@gmail.com>", "Apache Arrow <dev@arrow.apache.org>"]
license = "Apache-2.0"
keywords = [ "arrow" ]
edition = "2018"

[lib]
name = "arrow_pyarrow_integration_testing"
crate-type = ["cdylib"]

[dependencies]
arrow2 = { path = "../", default-features = false, features = ["compute"] }
pyo3 = { version = "0.12.1", features = ["extension-module"] }
arrow2 = { path = "../", default-features = false }
pyo3 = { version = "0.14", features = ["extension-module"] }

[package.metadata.maturin]
requires-dist = ["pyarrow>=1"]
173 changes: 62 additions & 111 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.

Expand All @@ -23,15 +6,11 @@ use std::fmt;
use std::sync::Arc;

use pyo3::exceptions::PyOSError;
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
use pyo3::{libc::uintptr_t, prelude::*};

use arrow2::array::{Array, Int64Array};
use arrow2::ffi;
use arrow2::{array::PrimitiveArray, compute};
use arrow2::{datatypes::DataType, error::ArrowError};

type ArrayRef = Arc<dyn Array>;
use arrow2::{array::Array, datatypes::Field, error::ArrowError, ffi};

/// an error that bridges ArrowError with a Python error
#[derive(Debug)]
Expand Down Expand Up @@ -70,130 +49,107 @@ impl From<PyO3ArrowError> for PyErr {
}
}

fn to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
// prepare a pointer to receive the Array struct
let array = Arc::new(ffi::create_empty());
let (array_ptr, schema_ptr) = array.references();
let array = Box::new(ffi::Ffi_ArrowArray::empty());
let schema = Box::new(ffi::Ffi_ArrowSchema::empty());

let array_ptr = &*array as *const ffi::Ffi_ArrowArray;
let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(
py,
"_export_to_c",
(array_ptr as uintptr_t, schema_ptr as uintptr_t),
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
)?;

Ok(ffi::try_from(array).map_err(PyO3ArrowError::from)?.into())
let field = ffi::import_field_from_c(schema.as_ref()).map_err(PyO3ArrowError::from)?;
let array = ffi::import_array_from_c(array, &field).map_err(PyO3ArrowError::from)?;

Ok(array.into())
}

fn to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
let array_ptr = ffi::export_to_c(array).map_err(PyO3ArrowError::from)?;
fn to_py_array(array: Arc<dyn Array>, py: Python) -> PyResult<PyObject> {
let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty());
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());

let array_ptr = Box::into_raw(array_ptr);
let schema_ptr = Box::into_raw(schema_ptr);

let (array_ptr, schema_ptr) = array_ptr.references();
unsafe {
ffi::export_field_to_c(&Field::new("", array.data_type().clone(), true), schema_ptr);
ffi::export_array_to_c(array, array_ptr);
};

let pa = py.import("pyarrow")?;

let array = pa.getattr("Array")?.call_method1(
"_import_from_c",
(array_ptr as uintptr_t, schema_ptr as uintptr_t),
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
)?;

Ok(array.to_object(py))
}
unsafe {
Box::from_raw(array_ptr);
Box::from_raw(schema_ptr);
};

/// Returns `array + array` of an int64 array.
#[pyfunction]
fn double(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;

// perform some operation
let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
PyO3ArrowError::ArrowError(ArrowError::Ffi("Expects an int64".to_string()))
})?;
let array =
compute::arithmetics::basic::add::add(&array, &array).map_err(PyO3ArrowError::from)?;
let array = Arc::new(array);

// export
to_py(array, py)
Ok(array.to_object(py))
}

/// calls a lambda function that receives and returns an array
/// whose result must be the array multiplied by two
#[pyfunction]
fn double_py(lambda: PyObject, py: Python) -> PyResult<bool> {
// create
let array = Arc::new(PrimitiveArray::<i64>::from(vec![Some(1), None, Some(3)]));
let expected = Arc::new(PrimitiveArray::<i64>::from(vec![Some(2), None, Some(6)])) as ArrayRef;
fn to_rust_field(ob: PyObject, py: Python) -> PyResult<Field> {
// prepare a pointer to receive the Array struct
let schema = Box::new(ffi::Ffi_ArrowSchema::empty());

// to py
let array = to_py(array, py)?;
let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema;

let array = lambda.call1(py, (array,))?;
// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(py, "_export_to_c", (schema_ptr as Py_uintptr_t,))?;

let array = to_rust(array, py)?;
let field = ffi::import_field_from_c(schema.as_ref()).map_err(PyO3ArrowError::from)?;

Ok(array == expected)
Ok(field)
}

/// Returns the substring
#[pyfunction]
fn substring(array: PyObject, start: i64, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
fn to_py_field(field: &Field, py: Python) -> PyResult<PyObject> {
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());
let schema_ptr = Box::into_raw(schema_ptr);

// substring
let array = compute::substring::substring(array.as_ref(), start, &None)
.map_err(PyO3ArrowError::from)?
.into();
unsafe {
ffi::export_field_to_c(field, schema_ptr);
};

// export
to_py(array, py)
}
let pa = py.import("pyarrow")?;

/// Returns the concatenate
#[pyfunction]
fn concatenate(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = pa
.getattr("Field")?
.call_method1("_import_from_c", (schema_ptr as Py_uintptr_t,))?;

// concat
let array = compute::concat::concatenate(&[array.as_ref(), array.as_ref()])
.map_err(PyO3ArrowError::from)?
.into();
unsafe { Box::from_raw(schema_ptr) };

// export
to_py(array, py)
Ok(array.to_object(py))
}

/// Converts to rust and back to python
#[pyfunction]
fn round_trip(array: PyObject, py: Python) -> PyResult<PyObject> {
fn round_trip_array(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = to_rust_array(array, py)?;

// export
to_py(array, py)
to_py_array(array, py)
}

/// Converts to rust and back to python
#[pyfunction]
fn import_primitive(array: PyObject, py: Python) -> PyResult<bool> {
let array = to_rust(array, py)?;
let expected = Arc::new(PrimitiveArray::<i64>::from(vec![Some(2), None, Some(6)])) as ArrayRef;

Ok(array == expected)
}

/// Converts to rust and back to python
#[pyfunction]
fn export_primitive(py: Python) -> PyResult<PyObject> {
let array = Arc::new(PrimitiveArray::<i64>::from(vec![Some(2), None, Some(6)])) as ArrayRef;

let array = to_py(array, py)?;
fn round_trip_field(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let field = to_rust_field(array, py)?;

Ok(array)
// export
to_py_field(&field, py)
}

#[pyfunction]
Expand All @@ -203,13 +159,8 @@ fn total_allocated_bytes() -> PyResult<isize> {

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(total_allocated_bytes))?;
m.add_wrapped(wrap_pyfunction!(export_primitive))?;
m.add_wrapped(wrap_pyfunction!(import_primitive))?;
m.add_wrapped(wrap_pyfunction!(double))?;
m.add_wrapped(wrap_pyfunction!(double_py))?;
m.add_wrapped(wrap_pyfunction!(substring))?;
m.add_wrapped(wrap_pyfunction!(concatenate))?;
m.add_wrapped(wrap_pyfunction!(round_trip))?;
m.add_function(wrap_pyfunction!(total_allocated_bytes, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_array, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_field, m)?)?;
Ok(())
}
Loading