Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added tests to integration with pyarrow
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 24, 2021
1 parent 664fc93 commit 1fc3e04
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 191 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-ffi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ jobs:
python -m venv venv
source venv/bin/activate
pip install maturin==0.10.2 toml==0.10.1 pyarrow==4.0.0
pip install maturin==0.10.2 toml==0.10.1 pyarrow==5.0.0
maturin develop
python -m unittest discover tests
12 changes: 4 additions & 8 deletions arrow-pyarrow-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@

[package]
name = "arrow-pyarrow-integration-testing"
description = ""
version = "4.0.0-SNAPSHOT"
homepage = "https://github.com/apache/arrow-rs"
repository = "https://github.com/apache/arrow-rs"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
version = "0.0.0"
authors = ["Jorge C. Leitao <jorgecarleitao@gmail.com>", "Apache Arrow <dev@arrow.apache.org>"]
license = "Apache-2.0"
keywords = [ "arrow" ]
edition = "2018"

[lib]
name = "arrow_pyarrow_integration_testing"
crate-type = ["cdylib"]

[dependencies]
arrow2 = { path = "../", default-features = false, features = ["compute"] }
pyo3 = { version = "0.12.1", features = ["extension-module"] }
arrow2 = { path = "../", default-features = false }
pyo3 = { version = "0.14", features = ["extension-module"] }

[package.metadata.maturin]
requires-dist = ["pyarrow>=1"]
148 changes: 39 additions & 109 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.

Expand All @@ -23,18 +6,11 @@ use std::fmt;
use std::sync::Arc;

use pyo3::exceptions::PyOSError;
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
use pyo3::{libc::uintptr_t, prelude::*};

use arrow2::array::{Array, Int64Array};
use arrow2::ffi;
use arrow2::{array::PrimitiveArray, compute};
use arrow2::{
datatypes::{DataType, Field},
error::ArrowError,
};

type ArrayRef = Arc<dyn Array>;
use arrow2::{array::Array, datatypes::Field, error::ArrowError, ffi};

/// an error that bridges ArrowError with a Python error
#[derive(Debug)]
Expand Down Expand Up @@ -73,7 +49,7 @@ impl From<PyO3ArrowError> for PyErr {
}
}

fn to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
// prepare a pointer to receive the Array struct
let array = Box::new(ffi::Ffi_ArrowArray::empty());
let schema = Box::new(ffi::Ffi_ArrowSchema::empty());
Expand All @@ -86,7 +62,7 @@ fn to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
ob.call_method1(
py,
"_export_to_c",
(array_ptr as uintptr_t, schema_ptr as uintptr_t),
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
)?;

let field = ffi::import_field_from_c(schema.as_ref()).map_err(PyO3ArrowError::from)?;
Expand All @@ -95,7 +71,7 @@ fn to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
Ok(array.into())
}

fn to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
fn to_py_array(array: Arc<dyn Array>, py: Python) -> PyResult<PyObject> {
let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty());
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());

Expand All @@ -111,7 +87,7 @@ fn to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {

let array = pa.getattr("Array")?.call_method1(
"_import_from_c",
(array_ptr as uintptr_t, schema_ptr as uintptr_t),
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
)?;

unsafe {
Expand All @@ -122,99 +98,58 @@ fn to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
Ok(array.to_object(py))
}

/// Returns `array + array` of an int64 array.
#[pyfunction]
fn double(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;

// perform some operation
let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
PyO3ArrowError::ArrowError(ArrowError::Ffi("Expects an int64".to_string()))
})?;
let array =
compute::arithmetics::basic::add::add(array, array).map_err(PyO3ArrowError::from)?;
let array = Arc::new(array);

// export
to_py(array, py)
}

/// calls a lambda function that receives and returns an array
/// whose result must be the array multiplied by two
#[pyfunction]
fn double_py(lambda: PyObject, py: Python) -> PyResult<bool> {
// create
let array = Arc::new(PrimitiveArray::<i64>::from(vec![Some(1), None, Some(3)]));
let expected = Arc::new(PrimitiveArray::<i64>::from(vec![Some(2), None, Some(6)])) as ArrayRef;
fn to_rust_field(ob: PyObject, py: Python) -> PyResult<Field> {
// prepare a pointer to receive the Array struct
let schema = Box::new(ffi::Ffi_ArrowSchema::empty());

// to py
let array = to_py(array, py)?;
let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema;

let array = lambda.call1(py, (array,))?;
// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(py, "_export_to_c", (schema_ptr as Py_uintptr_t,))?;

let array = to_rust(array, py)?;
let field = ffi::import_field_from_c(schema.as_ref()).map_err(PyO3ArrowError::from)?;

Ok(array == expected)
Ok(field)
}

/// Returns the substring
#[pyfunction]
fn substring(array: PyObject, start: i64, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
fn to_py_field(field: &Field, py: Python) -> PyResult<PyObject> {
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());
let schema_ptr = Box::into_raw(schema_ptr);

// substring
let array = compute::substring::substring(array.as_ref(), start, &None)
.map_err(PyO3ArrowError::from)?
.into();
unsafe {
ffi::export_field_to_c(field, schema_ptr);
};

// export
to_py(array, py)
}
let pa = py.import("pyarrow")?;

/// Returns the concatenate
#[pyfunction]
fn concatenate(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = pa
.getattr("Field")?
.call_method1("_import_from_c", (schema_ptr as Py_uintptr_t,))?;

// concat
let array = compute::concat::concatenate(&[array.as_ref(), array.as_ref()])
.map_err(PyO3ArrowError::from)?
.into();
unsafe { Box::from_raw(schema_ptr) };

// export
to_py(array, py)
Ok(array.to_object(py))
}

/// Converts to rust and back to python
#[pyfunction]
fn round_trip(array: PyObject, py: Python) -> PyResult<PyObject> {
fn round_trip_array(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = to_rust_array(array, py)?;

// export
to_py(array, py)
to_py_array(array, py)
}

/// Converts to rust and back to python
#[pyfunction]
fn import_primitive(array: PyObject, py: Python) -> PyResult<bool> {
let array = to_rust(array, py)?;
let expected = Arc::new(PrimitiveArray::<i64>::from(vec![Some(2), None, Some(6)])) as ArrayRef;

Ok(array == expected)
}

/// Converts to rust and back to python
#[pyfunction]
fn export_primitive(py: Python) -> PyResult<PyObject> {
let array = Arc::new(PrimitiveArray::<i64>::from(vec![Some(2), None, Some(6)])) as ArrayRef;

let array = to_py(array, py)?;
fn round_trip_field(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let field = to_rust_field(array, py)?;

Ok(array)
// export
to_py_field(&field, py)
}

#[pyfunction]
Expand All @@ -224,13 +159,8 @@ fn total_allocated_bytes() -> PyResult<isize> {

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(total_allocated_bytes))?;
m.add_wrapped(wrap_pyfunction!(export_primitive))?;
m.add_wrapped(wrap_pyfunction!(import_primitive))?;
m.add_wrapped(wrap_pyfunction!(double))?;
m.add_wrapped(wrap_pyfunction!(double_py))?;
m.add_wrapped(wrap_pyfunction!(substring))?;
m.add_wrapped(wrap_pyfunction!(concatenate))?;
m.add_wrapped(wrap_pyfunction!(round_trip))?;
m.add_function(wrap_pyfunction!(total_allocated_bytes, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_array, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_field, m)?)?;
Ok(())
}
Loading

0 comments on commit 1fc3e04

Please sign in to comment.