Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ crate-type = ["cdylib"]

[dependencies]
arrow = { path = "../arrow", features = ["pyarrow"] }
pyo3 = { version = "0.26.0", features = ["extension-module"] }
pyo3 = { version = "0.27.1", features = ["extension-module"] }
2 changes: 1 addition & 1 deletion arrow-pyarrow-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ publish = false
# Note no dependency on arrow, to ensure arrow-pyarrow can be used by itself
arrow-array = { path = "../arrow-array" }
arrow-pyarrow = { path = "../arrow-pyarrow" }
pyo3 = { version = "0.26.0", default-features = false }
pyo3 = { version = "0.27.1", default-features = false }
2 changes: 1 addition & 1 deletion arrow-pyarrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ all-features = true
arrow-array = { workspace = true, features = ["ffi"] }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
pyo3 = { version = "0.26.0", default-features = false }
pyo3 = { version = "0.27.1", default-features = false }
104 changes: 73 additions & 31 deletions arrow-pyarrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
//! and then importing the reader as a [ArrowArrayStreamReader].

use std::convert::{From, TryFrom};
use std::ffi::CStr;
use std::ptr::{addr_of, addr_of_mut};
use std::sync::Arc;

Expand All @@ -80,6 +81,10 @@ import_exception!(pyarrow, ArrowException);
/// Represents an exception raised by PyArrow.
pub type PyArrowException = ArrowException;

const ARROW_ARRAY_STREAM_CAPSULE_NAME: &CStr = c"arrow_array_stream";
const ARROW_SCHEMA_CAPSULE_NAME: &CStr = c"arrow_schema";
const ARROW_ARRAY_CAPSULE_NAME: &CStr = c"arrow_array";

fn to_py_err(err: ArrowError) -> PyErr {
PyArrowException::new_err(err.to_string())
}
Expand Down Expand Up @@ -136,7 +141,7 @@ fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
));
}

let capsule_name = capsule_name.unwrap().to_str()?;
let capsule_name = unsafe { capsule_name.unwrap().as_cstr().to_str()? };
if capsule_name != name {
return Err(PyValueError::new_err(format!(
"Expected name '{name}' in PyCapsule, instead got '{capsule_name}'",
Expand All @@ -153,12 +158,16 @@ impl FromPyArrow for DataType {
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_schema__")? {
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = capsule.downcast::<PyCapsule>()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_schema")?;

let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
let dtype = DataType::try_from(schema_ptr).map_err(to_py_err)?;
return Ok(dtype);
let schema_ptr = capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
unsafe {
let dtype = DataType::try_from(schema_ptr.as_ref()).map_err(to_py_err)?;
return Ok(dtype);
}
}

validate_class("DataType", value)?;
Expand Down Expand Up @@ -189,12 +198,16 @@ impl FromPyArrow for Field {
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_schema__")? {
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = capsule.downcast::<PyCapsule>()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_schema")?;

let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
let field = Field::try_from(schema_ptr).map_err(to_py_err)?;
return Ok(field);
let schema_ptr = capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
unsafe {
let field = Field::try_from(schema_ptr.as_ref()).map_err(to_py_err)?;
return Ok(field);
}
}

validate_class("Field", value)?;
Expand Down Expand Up @@ -225,12 +238,16 @@ impl FromPyArrow for Schema {
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_schema__")? {
let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = capsule.downcast::<PyCapsule>()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_schema")?;

let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
let schema = Schema::try_from(schema_ptr).map_err(to_py_err)?;
return Ok(schema);
let schema_ptr = capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
unsafe {
let schema = Schema::try_from(schema_ptr.as_ref()).map_err(to_py_err)?;
return Ok(schema);
}
}

validate_class("Schema", value)?;
Expand Down Expand Up @@ -269,16 +286,25 @@ impl FromPyArrow for ArrayData {
}

let schema_capsule = tuple.get_item(0)?;
let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
let schema_capsule = schema_capsule.cast::<PyCapsule>()?;
let array_capsule = tuple.get_item(1)?;
let array_capsule = array_capsule.downcast::<PyCapsule>()?;
let array_capsule = array_capsule.cast::<PyCapsule>()?;

validate_pycapsule(schema_capsule, "arrow_schema")?;
validate_pycapsule(array_capsule, "arrow_array")?;

let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
let array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
return unsafe { ffi::from_ffi(array, schema_ptr) }.map_err(to_py_err);
let schema_ptr = schema_capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
let array = unsafe {
FFI_ArrowArray::from_raw(
array_capsule
.pointer_checked(Some(ARROW_ARRAY_CAPSULE_NAME))?
.cast::<FFI_ArrowArray>()
.as_ptr(),
)
};
return unsafe { ffi::from_ffi(array, schema_ptr.as_ref()) }.map_err(to_py_err);
}

validate_class("Array", value)?;
Expand Down Expand Up @@ -322,7 +348,7 @@ impl ToPyArrow for ArrayData {

impl<T: FromPyArrow> FromPyArrow for Vec<T> {
fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
let list = value.downcast::<PyList>()?;
let list = value.cast::<PyList>()?;
list.iter().map(|x| T::from_pyarrow_bound(&x)).collect()
}
}
Expand All @@ -342,6 +368,7 @@ impl FromPyArrow for RecordBatch {
// Newer versions of PyArrow as well as other libraries with Arrow data implement this
// method, so prefer it over _export_to_c.
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html

if value.hasattr("__arrow_c_array__")? {
let tuple = value.getattr("__arrow_c_array__")?.call0()?;

Expand All @@ -352,17 +379,22 @@ impl FromPyArrow for RecordBatch {
}

let schema_capsule = tuple.get_item(0)?;
let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
let schema_capsule = schema_capsule.cast::<PyCapsule>()?;
let array_capsule = tuple.get_item(1)?;
let array_capsule = array_capsule.downcast::<PyCapsule>()?;
let array_capsule = array_capsule.cast::<PyCapsule>()?;

validate_pycapsule(schema_capsule, "arrow_schema")?;
validate_pycapsule(array_capsule, "arrow_array")?;

let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
let schema_ptr = schema_capsule
.pointer_checked(Some(ARROW_SCHEMA_CAPSULE_NAME))?
.cast::<FFI_ArrowSchema>();
let array_ptr = array_capsule
.pointer_checked(Some(ARROW_ARRAY_CAPSULE_NAME))?
.cast::<FFI_ArrowArray>();
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_ptr.as_ptr()) };
let mut array_data =
unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
unsafe { ffi::from_ffi(ffi_array, schema_ptr.as_ref()) }.map_err(to_py_err)?;
if !matches!(array_data.data_type(), DataType::Struct(_)) {
return Err(PyTypeError::new_err(
"Expected Struct type from __arrow_c_array.",
Expand All @@ -377,7 +409,8 @@ impl FromPyArrow for RecordBatch {
let array = StructArray::from(array_data);
// StructArray does not embed metadata from schema. We need to override
// the output schema with the schema from the capsule.
let schema = Arc::new(Schema::try_from(schema_ptr).map_err(to_py_err)?);
let schema =
unsafe { Arc::new(Schema::try_from(schema_ptr.as_ref()).map_err(to_py_err)?) };
let (_fields, columns, nulls) = array.into_parts();
assert_eq!(
nulls.map(|n| n.null_count()).unwrap_or_default(),
Expand All @@ -394,7 +427,7 @@ impl FromPyArrow for RecordBatch {

let arrays = value.getattr("columns")?;
let arrays = arrays
.downcast::<PyList>()?
.cast::<PyList>()?
.iter()
.map(|a| Ok(make_array(ArrayData::from_pyarrow_bound(&a)?)))
.collect::<PyResult<_>>()?;
Expand Down Expand Up @@ -429,10 +462,17 @@ impl FromPyArrow for ArrowArrayStreamReader {
// See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
if value.hasattr("__arrow_c_stream__")? {
let capsule = value.getattr("__arrow_c_stream__")?.call0()?;
let capsule = capsule.downcast::<PyCapsule>()?;
let capsule = capsule.cast::<PyCapsule>()?;
validate_pycapsule(capsule, "arrow_array_stream")?;

let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) };
let stream = unsafe {
FFI_ArrowArrayStream::from_raw(
capsule
.pointer_checked(Some(ARROW_ARRAY_STREAM_CAPSULE_NAME))?
.cast::<FFI_ArrowArrayStream>()
.as_ptr(),
)
};

let stream_reader = ArrowArrayStreamReader::try_new(stream)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
Expand Down Expand Up @@ -492,9 +532,11 @@ impl IntoPyArrow for ArrowArrayStreamReader {
#[derive(Debug)]
pub struct PyArrowType<T>(pub T);

impl<'source, T: FromPyArrow> FromPyObject<'source> for PyArrowType<T> {
fn extract_bound(value: &Bound<'source, PyAny>) -> PyResult<Self> {
Ok(Self(T::from_pyarrow_bound(value)?))
impl<T: FromPyArrow> FromPyObject<'_, '_> for PyArrowType<T> {
type Error = PyErr;

fn extract(value: Borrowed<'_, '_, PyAny>) -> PyResult<Self> {
Ok(Self(T::from_pyarrow_bound(&*value)?))
}
}

Expand Down
Loading