From bd61239a32c94e37b9510071c0ffacad455798c0 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 18 Oct 2023 04:44:50 -0700 Subject: [PATCH] GH-35531: [Python] C Data Interface PyCapsule Protocol (#37797) ### Rationale for this change ### What changes are included in this PR? * A new specification for Arrow PyCapsules and related dunder methods * Implementing the dunder methods for `DataType`, `Field`, `Schema`, `Array`, `RecordBatch`, `Table`, and `RecordBatchReader`. ### Are these changes tested? Yes, I've added various roundtrip tests for each of the types. ### Are there any user-facing changes? This introduces some new APIs and documents them. * Closes: #34031 * Closes: #35531 Authored-by: Joris Van den Bossche Signed-off-by: Antoine Pitrou --- docs/source/format/CDataInterface.rst | 11 + .../CDataInterface/PyCapsuleInterface.rst | 433 ++++++++++++++++++ docs/source/python/extending_types.rst | 9 + docs/source/python/interchange_protocol.rst | 2 + python/pyarrow/array.pxi | 81 +++- python/pyarrow/includes/libarrow.pxd | 6 +- python/pyarrow/ipc.pxi | 68 ++- python/pyarrow/table.pxi | 148 +++++- python/pyarrow/tests/test_array.py | 18 + python/pyarrow/tests/test_cffi.py | 126 ++++- python/pyarrow/tests/test_table.py | 87 ++++ python/pyarrow/tests/test_types.py | 14 + python/pyarrow/types.pxi | 185 +++++++- 13 files changed, 1177 insertions(+), 11 deletions(-) create mode 100644 docs/source/format/CDataInterface/PyCapsuleInterface.rst diff --git a/docs/source/format/CDataInterface.rst b/docs/source/format/CDataInterface.rst index 8f491470965b0..e0884686acf6c 100644 --- a/docs/source/format/CDataInterface.rst +++ b/docs/source/format/CDataInterface.rst @@ -990,3 +990,14 @@ adaptation cost. .. _Python buffer protocol: https://www.python.org/dev/peps/pep-3118/ + +Language-specific protocols +=========================== + +Some languages may define additional protocols on top of the Arrow C data +interface. + +.. toctree:: + :maxdepth: 1 + + CDataInterface/PyCapsuleInterface diff --git a/docs/source/format/CDataInterface/PyCapsuleInterface.rst b/docs/source/format/CDataInterface/PyCapsuleInterface.rst new file mode 100644 index 0000000000000..263c428c1ef0e --- /dev/null +++ b/docs/source/format/CDataInterface/PyCapsuleInterface.rst @@ -0,0 +1,433 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + + +============================= +The Arrow PyCapsule Interface +============================= + +.. warning:: The Arrow PyCapsule Interface should be considered experimental + +Rationale +========= + +The :ref:`C data interface ` and +:ref:`C stream interface ` allow moving Arrow data between +different implementations of Arrow. However, these interfaces don't specify how +Python libraries should expose these structs to other libraries. Prior to this, +many libraries simply provided export to PyArrow data structures, using the +``_import_from_c`` and ``_export_from_c`` methods. However, this always required +PyArrow to be installed. In addition, those APIs could cause memory leaks if +handled improperly. + +This interface allows any library to export Arrow data structures to other +libraries that understand the same protocol. + +Goals +----- + +* Standardize the `PyCapsule`_ objects that represent ``ArrowSchema``, ``ArrowArray``, + and ``ArrowArrayStream``. +* Define standard methods that export Arrow data into such capsule objects, + so that any Python library wanting to accept Arrow data as input can call the + corresponding method instead of hardcoding support for specific Arrow + producers. + + +Non-goals +--------- + +* Standardize what public APIs should be used for import. This is left up to + individual libraries. + +PyCapsule Standard +================== + +When exporting Arrow data through Python, the C Data Interface / C Stream Interface +structures should be wrapped in capsules. Capsules avoid invalid access by +attaching a name to the pointer and avoid memory leaks by attaching a destructor. +Thus, they are much safer than passing pointers as integers. + +`PyCapsule`_ allows for a ``name`` to be associated with the capsule, allowing +consumers to verify that the capsule contains the expected kind of data. To make sure +Arrow structures are recognized, the following names must be used: + +.. list-table:: + :widths: 25 25 + :header-rows: 1 + + * - C Interface Type + - PyCapsule Name + * - ArrowSchema + - ``arrow_schema`` + * - ArrowArray + - ``arrow_array`` + * - ArrowArrayStream + - ``arrow_array_stream`` + + +Lifetime Semantics +------------------ + +The exported PyCapsules should have a destructor that calls the +:ref:`release callback ` +of the Arrow struct, if it is not already null. This prevents a memory leak in +case the capsule was never passed to another consumer. + +If the capsule has been passed to a consumer, the consumer should have moved +the data and marked the release callback as null, so there isn’t a risk of +releasing data the consumer is using. +:ref:`Read more in the C Data Interface specification `. + +Just like in the C Data Interface, the PyCapsule objects defined here can only +be consumed once. + +For an example of a PyCapsule with a destructor, see `Create a PyCapsule`_. + + +Export Protocol +=============== + +The interface consists of three separate protocols: + +* ``ArrowSchemaExportable``, which defines the ``__arrow_c_schema__`` method. +* ``ArrowArrayExportable``, which defines the ``__arrow_c_array__`` method. +* ``ArrowStreamExportable``, which defines the ``__arrow_c_stream__`` method. + +ArrowSchema Export +------------------ + +Schemas, fields, and data types can implement the method ``__arrow_c_schema__``. + +.. py:method:: __arrow_c_schema__(self) -> object + + Export the object as an ArrowSchema. + + :return: A PyCapsule containing a C ArrowSchema representation of the + object. The capsule must have a name of ``"arrow_schema"``. + + +ArrowArray Export +----------------- + +Arrays and record batches (contiguous tables) can implement the method +``__arrow_c_array__``. + +.. py:method:: __arrow_c_array__(self, requested_schema: object | None = None) -> Tuple[object, object] + + Export the object as a pair of ArrowSchema and ArrowArray structures. + + :param requested_schema: A PyCapsule containing a C ArrowSchema representation + of a requested schema. Conversion to this schema is best-effort. See + `Schema Requests`_. + :type requested_schema: PyCapsule or None + + :return: A pair of PyCapsules containing a C ArrowSchema and ArrowArray, + respectively. The schema capsule should have the name ``"arrow_schema"`` + and the array capsule should have the name ``"arrow_array"``. + + +ArrowStream Export +------------------ + +Tables / DataFrames and streams can implement the method ``__arrow_c_stream__``. + +.. py:method:: __arrow_c_stream__(self, requested_schema: object | None = None) -> object + + Export the object as an ArrowArrayStream. + + :param requested_schema: A PyCapsule containing a C ArrowSchema representation + of a requested schema. Conversion to this schema is best-effort. See + `Schema Requests`_. + :type requested_schema: PyCapsule or None + + :return: A PyCapsule containing a C ArrowArrayStream representation of the + object. The capsule must have a name of ``"arrow_array_stream"``. + +Schema Requests +--------------- + +In some cases, there might be multiple possible Arrow representations of the +same data. For example, a library might have a single integer type, but Arrow +has multiple integer types with different sizes and sign. As another example, +Arrow has several possible encodings for an array of strings: 32-bit offsets, +64-bit offsets, string view, and dictionary-encoded. A sequence of strings could +export to any one of these Arrow representations. + +In order to allow the caller to request a specific representation, the +:meth:`__arrow_c_array__` and :meth:`__arrow_c_stream__` methods take an optional +``requested_schema`` parameter. This parameter is a PyCapsule containing an +``ArrowSchema``. + +The callee should attempt to provide the data in the requested schema. However, +if the callee cannot provide the data in the requested schema, they may return +with the same schema as if ``None`` were passed to ``requested_schema``. + +If the caller requests a schema that is not compatible with the data, +say requesting a schema with a different number of fields, the callee should +raise an exception. The requested schema mechanism is only meant to negotiate +between different representations of the same data and not to allow arbitrary +schema transformations. + + +.. _PyCapsule: https://docs.python.org/3/c-api/capsule.html + + +Protocol Typehints +------------------ + +The following typehints can be copied into your library to annotate that a +function accepts an object implementing one of these protocols. + +.. code-block:: python + + from typing import Tuple, Protocol + from typing_extensions import Self + + class ArrowSchemaExportable(Protocol): + def __arrow_c_schema__(self) -> object: ... + + class ArrowArrayExportable(Protocol): + def __arrow_c_array__( + self, + requested_schema: object | None = None + ) -> Tuple[object, object]: + ... + + class ArrowStreamExportable(Protocol): + def __arrow_c_stream__( + self, + requested_schema: object | None = None + ) -> object: + ... + +Examples +======== + +Create a PyCapsule +------------------ + + +To create a PyCapsule, use the `PyCapsule_New `_ +function. The function must be passed a destructor function that will be called +to release the data the capsule points to. It must first call the release +callback if it is not null, then free the struct. + +Below is the code to create a PyCapsule for an ``ArrowSchema``. The code for +``ArrowArray`` and ``ArrowArrayStream`` is similar. + +.. tab-set:: + + .. tab-item:: C + + .. code-block:: c + + #include + + void ReleaseArrowSchemaPyCapsule(PyObject* capsule) { + struct ArrowSchema* schema = + (struct ArrowSchema*)PyCapsule_GetPointer(capsule, "arrow_schema"); + if (schema->release != NULL) { + schema->release(schema); + } + free(schema); + } + + PyObject* ExportArrowSchemaPyCapsule() { + struct ArrowSchema* schema = + (struct ArrowSchema*)malloc(sizeof(struct ArrowSchema)); + // Fill in ArrowSchema fields + // ... + return PyCapsule_New(schema, "arrow_schema", ReleaseArrowSchemaPyCapsule); + } + + .. tab-item:: Cython + + .. code-block:: cython + + cimport cpython + from libc.stdlib cimport malloc, free + + cdef void release_arrow_schema_py_capsule(object schema_capsule): + cdef ArrowSchema* schema = cpython.PyCapsule_GetPointer( + schema_capsule, 'arrow_schema' + ) + if schema.release != NULL: + schema.release(schema) + + free(schema) + + cdef object export_arrow_schema_py_capsule(): + cdef ArrowSchema* schema = malloc(sizeof(ArrowSchema)) + # It's recommended to immediately wrap the struct in a capsule, so + # if subsequent lines raise an exception memory will not be leaked. + schema.release = NULL + capsule = cpython.PyCapsule_New( + schema, 'arrow_schema', release_arrow_schema_py_capsule + ) + # Fill in ArrowSchema fields: + # schema.format = ... + # ... + return capsule + + +Consume a PyCapsule +------------------- + +To consume a PyCapsule, use the `PyCapsule_GetPointer `_ function +to get the pointer to the underlying struct. Import the struct using your +system's Arrow C Data Interface import function. Only after that should the +capsule be freed. + +The below example shows how to consume a PyCapsule for an ``ArrowSchema``. The +code for ``ArrowArray`` and ``ArrowArrayStream`` is similar. + +.. tab-set:: + + .. tab-item:: C + + .. code-block:: c + + #include + + // If the capsule is not an ArrowSchema, will return NULL and set an exception. + struct ArrowSchema* GetArrowSchemaPyCapsule(PyObject* capsule) { + return PyCapsule_GetPointer(capsule, "arrow_schema"); + } + + .. tab-item:: Cython + + .. code-block:: cython + + cimport cpython + + cdef ArrowSchema* get_arrow_schema_py_capsule(object capsule) except NULL: + return cpython.PyCapsule_GetPointer(capsule, 'arrow_schema') + +Backwards Compatibility with PyArrow +------------------------------------ + +When interacting with PyArrow, the PyCapsule interface should be preferred over +the ``_export_to_c`` and ``_import_from_c`` methods. However, many libraries will +want to support a range of PyArrow versions. This can be done via Duck typing. + +For example, if your library had an import method such as: + +.. code-block:: python + + # OLD METHOD + def from_arrow(arr: pa.Array) + array_import_ptr = make_array_import_ptr() + schema_import_ptr = make_schema_import_ptr() + arr._export_to_c(array_import_ptr, schema_import_ptr) + return import_c_data(array_import_ptr, schema_import_ptr) + +You can rewrite this method to support both PyArrow and other libraries that +implement the PyCapsule interface: + +.. code-block:: python + + # NEW METHOD + def from_arrow(arr) + # Newer versions of PyArrow as well as other libraries with Arrow data + # implement this method, so prefer it over _export_to_c. + if hasattr(arr, "__arrow_c_array__"): + schema_ptr, array_ptr = arr.__arrow_c_array__() + return import_c_capsule_data(schema_ptr, array_ptr) + elif isinstance(arr, pa.Array): + # Deprecated method, used for older versions of PyArrow + array_import_ptr = make_array_import_ptr() + schema_import_ptr = make_schema_import_ptr() + arr._export_to_c(array_import_ptr, schema_import_ptr) + return import_c_data(array_import_ptr, schema_import_ptr) + else: + raise TypeError(f"Cannot import {type(arr)} as Arrow array data.") + +You may also wish to accept objects implementing the protocol in your +constructors. For example, in PyArrow, the :func:`array` and :func:`record_batch` +constructors accept any object that implements the :meth:`__arrow_c_array__` method +protocol. Similarly, the PyArrow's :func:`schema` constructor accepts any object +that implements the :meth:`__arrow_c_schema__` method. + +Now if your library has an export to PyArrow function, such as: + +.. code-block:: python + + # OLD METHOD + def to_arrow(self) -> pa.Array: + array_export_ptr = make_array_export_ptr() + schema_export_ptr = make_schema_export_ptr() + self.export_c_data(array_export_ptr, schema_export_ptr) + return pa.Array._import_from_c(array_export_ptr, schema_export_ptr) + +You can rewrite this function to use the PyCapsule interface by passing your +object to the :py:func:`array` constructor, which accepts any object that +implements the protocol. An easy way to check if the PyArrow version is new +enough to support this is to check whether ``pa.Array`` has the +``__arrow_c_array__`` method. + +.. code-block:: python + + import warnings + + # NEW METHOD + def to_arrow(self) -> pa.Array: + # PyArrow added support for constructing arrays from objects implementing + # __arrow_c_array__ in the same version it added the method for it's own + # arrays. So we can use hasattr to check if the method is available as + # a proxy for checking the PyArrow version. + if hasattr(pa.Array, "__arrow_c_array__"): + return pa.array(self) + else: + array_export_ptr = make_array_export_ptr() + schema_export_ptr = make_schema_export_ptr() + self.export_c_data(array_export_ptr, schema_export_ptr) + return pa.Array._import_from_c(array_export_ptr, schema_export_ptr) + + +Comparison with Other Protocols +=============================== + +Comparison to DataFrame Interchange Protocol +-------------------------------------------- + +`The DataFrame Interchange Protocol `_ +is another protocol in Python that allows for the sharing of data between libraries. +This protocol is complementary to the DataFrame Interchange Protocol. Many of +the objects that implement this protocol will also implement the DataFrame +Interchange Protocol. + +This protocol is specific to Arrow-based data structures, while the DataFrame +Interchange Protocol allows non-Arrow data frames and arrays to be shared as well. +Because of this, these PyCapsules can support Arrow-specific features such as +nested columns. + +This protocol is also much more minimal than the DataFrame Interchange Protocol. +It just handles data export, rather than defining accessors for details like +number of rows or columns. + +In summary, if you are implementing this protocol, you should also consider +implementing the DataFrame Interchange Protocol. + + +Comparison to ``__arrow_array__`` protocol +------------------------------------------ + +The :ref:`arrow_array_protocol` protocol is a dunder method that +defines how PyArrow should import an object as an Arrow array. Unlike this +protocol, it is specific to PyArrow and isn't used by other libraries. It is +also limited to arrays and does not support schemas, tabular structures, or streams. \ No newline at end of file diff --git a/docs/source/python/extending_types.rst b/docs/source/python/extending_types.rst index 87f04f37dc69c..b9e875ceebc74 100644 --- a/docs/source/python/extending_types.rst +++ b/docs/source/python/extending_types.rst @@ -21,6 +21,8 @@ Extending pyarrow ================= +.. _arrow_array_protocol: + Controlling conversion to pyarrow.Array with the ``__arrow_array__`` protocol ----------------------------------------------------------------------------- @@ -46,6 +48,13 @@ The ``__arrow_array__`` method takes an optional `type` keyword which is passed through from :func:`pyarrow.array`. The method is allowed to return either a :class:`~pyarrow.Array` or a :class:`~pyarrow.ChunkedArray`. +.. note:: + + For a more general way to control the conversion of Python objects to Arrow + data consider the :doc:`/format/CDataInterface/PyCapsuleInterface`. It is + not specific to PyArrow and supports converting other objects such as tables + and schemas. + Defining extension types ("user-defined types") ----------------------------------------------- diff --git a/docs/source/python/interchange_protocol.rst b/docs/source/python/interchange_protocol.rst index 7784d78619e6e..e293699220c27 100644 --- a/docs/source/python/interchange_protocol.rst +++ b/docs/source/python/interchange_protocol.rst @@ -15,6 +15,8 @@ .. specific language governing permissions and limitations .. under the License. +.. _pyarrow-dataframe-interchange-protocol: + Dataframe Interchange Protocol ============================== diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index e36d8b2f04315..2e9750382277a 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -15,8 +15,11 @@ # specific language governing permissions and limitations # under the License. +from cpython.pycapsule cimport PyCapsule_CheckExact, PyCapsule_GetPointer, PyCapsule_New + import os import warnings +from cython import sizeof cdef _sequence_to_array(object sequence, object mask, object size, @@ -123,9 +126,11 @@ def array(object obj, type=None, mask=None, size=None, from_pandas=None, Parameters ---------- - obj : sequence, iterable, ndarray or pandas.Series + obj : sequence, iterable, ndarray, pandas.Series, Arrow-compatible array If both type and size are specified may be a single use iterable. If not strongly-typed, Arrow type will be inferred for resulting array. + Any Arrow-compatible array that implements the Arrow PyCapsule Protocol + (has an ``__arrow_c_array__`` method) can be passed as well. type : pyarrow.DataType Explicit type to attempt to coerce to, otherwise will be inferred from the data. @@ -241,6 +246,18 @@ def array(object obj, type=None, mask=None, size=None, from_pandas=None, if hasattr(obj, '__arrow_array__'): return _handle_arrow_array_protocol(obj, type, mask, size) + elif hasattr(obj, '__arrow_c_array__'): + if type is not None: + requested_type = type.__arrow_c_schema__() + else: + requested_type = None + schema_capsule, array_capsule = obj.__arrow_c_array__(requested_type) + out_array = Array._import_from_c_capsule(schema_capsule, array_capsule) + if type is not None and out_array.type != type: + # PyCapsule interface type coersion is best effort, so we need to + # check the type of the returned array and cast if necessary + out_array = array.cast(type, safe=safe, memory_pool=memory_pool) + return out_array elif _is_array_like(obj): if mask is not None: if _is_array_like(mask): @@ -1699,6 +1716,68 @@ cdef class Array(_PandasConvertible): c_type)) return pyarrow_wrap_array(c_array) + def __arrow_c_array__(self, requested_schema=None): + """ + Get a pair of PyCapsules containing a C ArrowArray representation of the object. + + Parameters + ---------- + requested_schema : PyCapsule | None + A PyCapsule containing a C ArrowSchema representation of a requested + schema. PyArrow will attempt to cast the array to this data type. + If None, the array will be returned as-is, with a type matching the + one returned by :meth:`__arrow_c_schema__()`. + + Returns + ------- + Tuple[PyCapsule, PyCapsule] + A pair of PyCapsules containing a C ArrowSchema and ArrowArray, + respectively. + """ + cdef: + ArrowArray* c_array + ArrowSchema* c_schema + shared_ptr[CArray] inner_array + + if requested_schema is not None: + target_type = DataType._import_from_c_capsule(requested_schema) + + if target_type != self.type: + try: + casted_array = _pc().cast(self, target_type, safe=True) + inner_array = pyarrow_unwrap_array(casted_array) + except ArrowInvalid as e: + raise ValueError( + f"Could not cast {self.type} to requested type {target_type}: {e}" + ) + else: + inner_array = self.sp_array + else: + inner_array = self.sp_array + + schema_capsule = alloc_c_schema(&c_schema) + array_capsule = alloc_c_array(&c_array) + + with nogil: + check_status(ExportArray(deref(inner_array), c_array, c_schema)) + + return schema_capsule, array_capsule + + @staticmethod + def _import_from_c_capsule(schema_capsule, array_capsule): + cdef: + ArrowSchema* c_schema + ArrowArray* c_array + shared_ptr[CArray] array + + c_schema = PyCapsule_GetPointer(schema_capsule, 'arrow_schema') + c_array = PyCapsule_GetPointer(array_capsule, 'arrow_array') + + with nogil: + array = GetResultValue(ImportArray(c_array, c_schema)) + + return pyarrow_wrap_array(array) + cdef _array_like_to_pandas(obj, options, types_mapper): cdef: diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index ad79c0edcd8a1..fda9d4449763e 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2743,13 +2743,13 @@ cdef extern from "arrow/array/concatenate.h" namespace "arrow" nogil: cdef extern from "arrow/c/abi.h": cdef struct ArrowSchema: - pass + void (*release)(ArrowSchema*) noexcept nogil cdef struct ArrowArray: - pass + void (*release)(ArrowArray*) noexcept nogil cdef struct ArrowArrayStream: - pass + void (*release)(ArrowArrayStream*) noexcept nogil cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CStatus ExportType(CDataType&, ArrowSchema* out) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 53e521fc11468..deb3bb728aea9 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -15,9 +15,11 @@ # specific language governing permissions and limitations # under the License. +from cpython.pycapsule cimport PyCapsule_CheckExact, PyCapsule_GetPointer, PyCapsule_New + from collections import namedtuple import warnings - +from cython import sizeof cpdef enum MetadataVersion: V1 = CMetadataVersion_V1 @@ -815,6 +817,70 @@ cdef class RecordBatchReader(_Weakrefable): self.reader = c_reader return self + def __arrow_c_stream__(self, requested_schema=None): + """ + Export to a C ArrowArrayStream PyCapsule. + + Parameters + ---------- + requested_schema: Schema, default None + The schema to which the stream should be casted. Currently, this is + not supported and will raise a NotImplementedError if the schema + doesn't match the current schema. + + Returns + ------- + PyCapsule + A capsule containing a C ArrowArrayStream struct. + """ + cdef: + ArrowArrayStream* c_stream + + if requested_schema is not None: + out_schema = Schema._import_from_c_capsule(requested_schema) + # TODO: figure out a way to check if one schema is castable to + # another. Once we have that, we can perform validation here and + # if successful creating a wrapping reader that casts each batch. + if self.schema != out_schema: + raise NotImplementedError("Casting to requested_schema") + + stream_capsule = alloc_c_stream(&c_stream) + + with nogil: + check_status(ExportRecordBatchReader(self.reader, c_stream)) + + return stream_capsule + + @staticmethod + def _import_from_c_capsule(stream): + """ + Import RecordBatchReader from a C ArrowArrayStream PyCapsule. + + Parameters + ---------- + stream: PyCapsule + A capsule containing a C ArrowArrayStream PyCapsule. + + Returns + ------- + RecordBatchReader + """ + cdef: + ArrowArrayStream* c_stream + shared_ptr[CRecordBatchReader] c_reader + RecordBatchReader self + + c_stream = PyCapsule_GetPointer( + stream, 'arrow_array_stream' + ) + + with nogil: + c_reader = GetResultValue(ImportRecordBatchReader(c_stream)) + + self = RecordBatchReader.__new__(RecordBatchReader) + self.reader = c_reader + return self + @staticmethod def from_batches(Schema schema not None, batches): """ diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 72af5a2deea9c..bbf60416de995 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -15,8 +15,10 @@ # specific language governing permissions and limitations # under the License. -import warnings +from cpython.pycapsule cimport PyCapsule_CheckExact, PyCapsule_GetPointer, PyCapsule_New +import warnings +from cython import sizeof cdef class ChunkedArray(_PandasConvertible): """ @@ -2983,6 +2985,100 @@ cdef class RecordBatch(_Tabular): c_ptr, c_schema)) return pyarrow_wrap_batch(c_batch) + def __arrow_c_array__(self, requested_schema=None): + """ + Get a pair of PyCapsules containing a C ArrowArray representation of the object. + + Parameters + ---------- + requested_schema : PyCapsule | None + A PyCapsule containing a C ArrowSchema representation of a requested + schema. PyArrow will attempt to cast the batch to this schema. + If None, the schema will be returned as-is, with a schema matching the + one returned by :meth:`__arrow_c_schema__()`. + + Returns + ------- + Tuple[PyCapsule, PyCapsule] + A pair of PyCapsules containing a C ArrowSchema and ArrowArray, + respectively. + """ + cdef: + ArrowArray* c_array + ArrowSchema* c_schema + + if requested_schema is not None: + target_schema = Schema._import_from_c_capsule(requested_schema) + + if target_schema != self.schema: + try: + # We don't expose .cast() on RecordBatch, only on Table. + casted_batch = Table.from_batches([self]).cast( + target_schema, safe=True).to_batches()[0] + inner_batch = pyarrow_unwrap_batch(casted_batch) + except ArrowInvalid as e: + raise ValueError( + f"Could not cast {self.schema} to requested schema {target_schema}: {e}" + ) + else: + inner_batch = self.sp_batch + else: + inner_batch = self.sp_batch + + schema_capsule = alloc_c_schema(&c_schema) + array_capsule = alloc_c_array(&c_array) + + with nogil: + check_status(ExportRecordBatch(deref(inner_batch), c_array, c_schema)) + + return schema_capsule, array_capsule + + def __arrow_c_stream__(self, requested_schema=None): + """ + Export the batch as an Arrow C stream PyCapsule. + + Parameters + ---------- + requested_schema : pyarrow.lib.Schema, default None + A schema to attempt to cast the streamed data to. This is currently + unsupported and will raise an error. + + Returns + ------- + PyCapsule + """ + return Table.from_batches([self]).__arrow_c_stream__(requested_schema) + + @staticmethod + def _import_from_c_capsule(schema_capsule, array_capsule): + """ + Import RecordBatch from a pair of PyCapsules containing a C ArrowArray + and ArrowSchema, respectively. + + Parameters + ---------- + schema_capsule : PyCapsule + A PyCapsule containing a C ArrowSchema representation of the schema. + array_capsule : PyCapsule + A PyCapsule containing a C ArrowArray representation of the array. + + Returns + ------- + pyarrow.RecordBatch + """ + cdef: + ArrowSchema* c_schema + ArrowArray* c_array + shared_ptr[CRecordBatch] c_batch + + c_schema = PyCapsule_GetPointer(schema_capsule, 'arrow_schema') + c_array = PyCapsule_GetPointer(array_capsule, 'arrow_array') + + with nogil: + c_batch = GetResultValue(ImportRecordBatch(c_array, c_schema)) + + return pyarrow_wrap_batch(c_batch) + def _reconstruct_record_batch(columns, schema): """ @@ -4757,6 +4853,22 @@ cdef class Table(_Tabular): output_type=Table ) + def __arrow_c_stream__(self, requested_schema=None): + """ + Export the table as an Arrow C stream PyCapsule. + + Parameters + ---------- + requested_schema : pyarrow.lib.Schema, default None + A schema to attempt to cast the streamed data to. This is currently + unsupported and will raise an error. + + Returns + ------- + PyCapsule + """ + return self.to_reader().__arrow_c_stream__(requested_schema) + def _reconstruct_table(arrays, schema): """ @@ -4772,8 +4884,10 @@ def record_batch(data, names=None, schema=None, metadata=None): Parameters ---------- - data : pandas.DataFrame, list - A DataFrame or list of arrays or chunked arrays. + data : pandas.DataFrame, list, Arrow-compatible table + A DataFrame, list of arrays or chunked arrays, or a tabular object + implementing the Arrow PyCapsule Protocol (has an + ``__arrow_c_array__`` method). names : list, default None Column names if list of arrays passed as data. Mutually exclusive with 'schema' argument. @@ -4892,6 +5006,18 @@ def record_batch(data, names=None, schema=None, metadata=None): if isinstance(data, (list, tuple)): return RecordBatch.from_arrays(data, names=names, schema=schema, metadata=metadata) + elif hasattr(data, "__arrow_c_array__"): + if schema is not None: + requested_schema = schema.__arrow_c_schema__() + else: + requested_schema = None + schema_capsule, array_capsule = data.__arrow_c_array__(requested_schema) + batch = RecordBatch._import_from_c_capsule(schema_capsule, array_capsule) + if schema is not None and batch.schema != schema: + # __arrow_c_array__ coerces schema with best effort, so we might + # need to cast it if the producer wasn't able to cast to exact schema. + batch = Table.from_batches([batch]).cast(schema).to_batches()[0] + return batch elif _pandas_api.is_data_frame(data): return RecordBatch.from_pandas(data, schema=schema) else: @@ -5013,6 +5139,22 @@ def table(data, names=None, schema=None, metadata=None, nthreads=None): raise ValueError( "The 'names' argument is not valid when passing a dictionary") return Table.from_pydict(data, schema=schema, metadata=metadata) + elif hasattr(data, "__arrow_c_stream__"): + if schema is not None: + requested = schema.__arrow_c_schema__() + else: + requested = None + capsule = data.__arrow_c_stream__(requested) + reader = RecordBatchReader._import_from_c_capsule(capsule) + table = reader.read_all() + if schema is not None and table.schema != schema: + # __arrow_c_array__ coerces schema with best effort, so we might + # need to cast it if the producer wasn't able to cast to exact schema. + table = table.cast(schema) + return table + elif hasattr(data, "__arrow_c_array__"): + batch = record_batch(data, schema) + return Table.from_batches([batch]) elif _pandas_api.is_data_frame(data): if names is not None or metadata is not None: raise ValueError( diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index cd565a72bc19f..2f9727922b49a 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -3336,6 +3336,24 @@ def __arrow_array__(self, type=None): assert result.equals(expected) +def test_c_array_protocol(): + class ArrayWrapper: + def __init__(self, data): + self.data = data + + def __arrow_c_array__(self, requested_type=None): + return self.data.__arrow_c_array__(requested_type) + + # Can roundtrip through the C array protocol + arr = ArrayWrapper(pa.array([1, 2, 3], type=pa.int64())) + result = pa.array(arr) + assert result == arr.data + + # Will case to requested type + result = pa.array(arr, type=pa.int32()) + assert result == pa.array([1, 2, 3], type=pa.int32()) + + def test_concat_array(): concatenated = pa.concat_arrays( [pa.array([1, 2]), pa.array([3, 4])]) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index 9b0e24fa46251..55bab4359bffc 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. +import ctypes import gc import pyarrow as pa @@ -36,7 +37,6 @@ needs_cffi = pytest.mark.skipif(ffi is None, reason="test needs cffi package installed") - assert_schema_released = pytest.raises( ValueError, match="Cannot import released ArrowSchema") @@ -47,6 +47,10 @@ ValueError, match="Cannot import released ArrowArrayStream") +def PyCapsule_IsValid(capsule, name): + return ctypes.pythonapi.PyCapsule_IsValid(ctypes.py_object(capsule), name) == 1 + + class ParamExtType(pa.PyExtensionType): def __init__(self, width): @@ -411,3 +415,123 @@ def test_imported_batch_reader_error(): match="Expected to be able to read 16 bytes " "for message body, got 8"): reader_new.read_all() + + +@pytest.mark.parametrize('obj', [pa.int32(), pa.field('foo', pa.int32()), + pa.schema({'foo': pa.int32()})], + ids=['type', 'field', 'schema']) +def test_roundtrip_schema_capsule(obj): + gc.collect() # Make sure no Arrow data dangles in a ref cycle + old_allocated = pa.total_allocated_bytes() + + capsule = obj.__arrow_c_schema__() + assert PyCapsule_IsValid(capsule, b"arrow_schema") == 1 + assert pa.total_allocated_bytes() > old_allocated + obj_out = type(obj)._import_from_c_capsule(capsule) + assert obj_out == obj + + assert pa.total_allocated_bytes() == old_allocated + + capsule = obj.__arrow_c_schema__() + + assert pa.total_allocated_bytes() > old_allocated + del capsule + assert pa.total_allocated_bytes() == old_allocated + + +@pytest.mark.parametrize('arr,schema_accessor,bad_type,good_type', [ + (pa.array(['a', 'b', 'c']), lambda x: x.type, pa.int32(), pa.string()), + ( + pa.record_batch([pa.array(['a', 'b', 'c'])], names=['x']), + lambda x: x.schema, + pa.schema({'x': pa.int32()}), + pa.schema({'x': pa.string()}) + ), +], ids=['array', 'record_batch']) +def test_roundtrip_array_capsule(arr, schema_accessor, bad_type, good_type): + gc.collect() # Make sure no Arrow data dangles in a ref cycle + old_allocated = pa.total_allocated_bytes() + + import_array = type(arr)._import_from_c_capsule + + schema_capsule, capsule = arr.__arrow_c_array__() + assert PyCapsule_IsValid(schema_capsule, b"arrow_schema") == 1 + assert PyCapsule_IsValid(capsule, b"arrow_array") == 1 + arr_out = import_array(schema_capsule, capsule) + assert arr_out.equals(arr) + + assert pa.total_allocated_bytes() > old_allocated + del arr_out + + assert pa.total_allocated_bytes() == old_allocated + + capsule = arr.__arrow_c_array__() + + assert pa.total_allocated_bytes() > old_allocated + del capsule + assert pa.total_allocated_bytes() == old_allocated + + with pytest.raises(ValueError, + match=r"Could not cast.* string to requested .* int32"): + arr.__arrow_c_array__(bad_type.__arrow_c_schema__()) + + schema_capsule, array_capsule = arr.__arrow_c_array__( + good_type.__arrow_c_schema__()) + arr_out = import_array(schema_capsule, array_capsule) + assert schema_accessor(arr_out) == good_type + + +# TODO: implement requested_schema for stream +@pytest.mark.parametrize('constructor', [ + pa.RecordBatchReader.from_batches, + # Use a lambda because we need to re-order the parameters + lambda schema, batches: pa.Table.from_batches(batches, schema), +], ids=['recordbatchreader', 'table']) +def test_roundtrip_reader_capsule(constructor): + batches = make_batches() + schema = batches[0].schema + + gc.collect() # Make sure no Arrow data dangles in a ref cycle + old_allocated = pa.total_allocated_bytes() + + obj = constructor(schema, batches) + + capsule = obj.__arrow_c_stream__() + assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 + imported_reader = pa.RecordBatchReader._import_from_c_capsule(capsule) + assert imported_reader.schema == schema + imported_batches = list(imported_reader) + assert len(imported_batches) == len(batches) + for batch, expected in zip(imported_batches, batches): + assert batch.equals(expected) + + del obj, imported_reader, batch, expected, imported_batches + + assert pa.total_allocated_bytes() == old_allocated + + obj = constructor(schema, batches) + + # TODO: turn this to ValueError once we implement validation. + bad_schema = pa.schema({'ints': pa.int32()}) + with pytest.raises(NotImplementedError): + obj.__arrow_c_stream__(bad_schema.__arrow_c_schema__()) + + # Can work with matching schema + matching_schema = pa.schema({'ints': pa.list_(pa.int32())}) + capsule = obj.__arrow_c_stream__(matching_schema.__arrow_c_schema__()) + imported_reader = pa.RecordBatchReader._import_from_c_capsule(capsule) + assert imported_reader.schema == matching_schema + for batch, expected in zip(imported_reader, batches): + assert batch.equals(expected) + + +def test_roundtrip_batch_reader_capsule(): + batch = make_batch() + + capsule = batch.__arrow_c_stream__() + assert PyCapsule_IsValid(capsule, b"arrow_array_stream") == 1 + imported_reader = pa.RecordBatchReader._import_from_c_capsule(capsule) + assert imported_reader.schema == batch.schema + assert imported_reader.read_next_batch().equals(batch) + with pytest.raises(StopIteration): + imported_reader.read_next_batch() diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 6b48633b91f8e..a678f521e38d5 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -553,6 +553,93 @@ def test_recordbatch_dunder_init(): pa.RecordBatch() +def test_recordbatch_c_array_interface(): + class BatchWrapper: + def __init__(self, batch): + self.batch = batch + + def __arrow_c_array__(self, requested_type=None): + return self.batch.__arrow_c_array__(requested_type) + + data = pa.record_batch([ + pa.array([1, 2, 3], type=pa.int64()) + ], names=['a']) + wrapper = BatchWrapper(data) + + # Can roundtrip through the wrapper. + result = pa.record_batch(wrapper) + assert result == data + + # Can also import with a schema that implementer can cast to. + castable_schema = pa.schema([ + pa.field('a', pa.int32()) + ]) + result = pa.record_batch(wrapper, schema=castable_schema) + expected = pa.record_batch([ + pa.array([1, 2, 3], type=pa.int32()) + ], names=['a']) + assert result == expected + + +def test_table_c_array_interface(): + class BatchWrapper: + def __init__(self, batch): + self.batch = batch + + def __arrow_c_array__(self, requested_type=None): + return self.batch.__arrow_c_array__(requested_type) + + data = pa.record_batch([ + pa.array([1, 2, 3], type=pa.int64()) + ], names=['a']) + wrapper = BatchWrapper(data) + + # Can roundtrip through the wrapper. + result = pa.table(wrapper) + expected = pa.Table.from_batches([data]) + assert result == expected + + # Can also import with a schema that implementer can cast to. + castable_schema = pa.schema([ + pa.field('a', pa.int32()) + ]) + result = pa.table(wrapper, schema=castable_schema) + expected = pa.table({ + 'a': pa.array([1, 2, 3], type=pa.int32()) + }) + assert result == expected + + +def test_table_c_stream_interface(): + class StreamWrapper: + def __init__(self, batches): + self.batches = batches + + def __arrow_c_stream__(self, requested_type=None): + reader = pa.RecordBatchReader.from_batches( + self.batches[0].schema, self.batches) + return reader.__arrow_c_stream__(requested_type) + + data = [ + pa.record_batch([pa.array([1, 2, 3], type=pa.int64())], names=['a']), + pa.record_batch([pa.array([4, 5, 6], type=pa.int64())], names=['a']) + ] + wrapper = StreamWrapper(data) + + # Can roundtrip through the wrapper. + result = pa.table(wrapper) + expected = pa.Table.from_batches(data) + assert result == expected + + # Passing schema works if already that schema + result = pa.table(wrapper, schema=data[0].schema) + assert result == expected + + # If schema doesn't match, raises NotImplementedError + with pytest.raises(NotImplementedError): + pa.table(wrapper, schema=pa.schema([pa.field('a', pa.int32())])) + + def test_recordbatch_itercolumns(): data = [ pa.array(range(5), type='int16'), diff --git a/python/pyarrow/tests/test_types.py b/python/pyarrow/tests/test_types.py index 660765f3361ae..16343eae61245 100644 --- a/python/pyarrow/tests/test_types.py +++ b/python/pyarrow/tests/test_types.py @@ -1225,3 +1225,17 @@ def test_types_come_back_with_specific_type(): schema = pa.schema([pa.field("field_name", arrow_type)]) type_back = schema.field("field_name").type assert type(type_back) is type(arrow_type) + + +def test_schema_import_c_schema_interface(): + class Wrapper: + def __init__(self, schema): + self.schema = schema + + def __arrow_c_schema__(self): + return self.schema.__arrow_c_schema__() + + schema = pa.schema([pa.field("field_name", pa.int32())]) + wrapped_schema = Wrapper(schema) + + assert pa.schema(wrapped_schema) == schema diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 764cb8e7b5d8b..d394b803e7fc2 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -from cpython.pycapsule cimport PyCapsule_CheckExact, PyCapsule_GetPointer +from cpython.pycapsule cimport PyCapsule_CheckExact, PyCapsule_GetPointer, PyCapsule_New, PyCapsule_IsValid import atexit from collections.abc import Mapping @@ -23,7 +23,7 @@ import pickle import re import sys import warnings - +from cython import sizeof # These are imprecise because the type (in pandas 0.x) depends on the presence # of nulls @@ -357,6 +357,46 @@ cdef class DataType(_Weakrefable): _as_c_pointer(in_ptr))) return pyarrow_wrap_data_type(result) + def __arrow_c_schema__(self): + """ + Export to a ArrowSchema PyCapsule + + Unlike _export_to_c, this will not leak memory if the capsule is not used. + """ + cdef ArrowSchema* c_schema + capsule = alloc_c_schema(&c_schema) + + with nogil: + check_status(ExportType(deref(self.type), c_schema)) + + return capsule + + @staticmethod + def _import_from_c_capsule(schema): + """ + Import a DataType from a ArrowSchema PyCapsule + + Parameters + ---------- + schema : PyCapsule + A valid PyCapsule with name 'arrow_schema' containing an + ArrowSchema pointer. + """ + cdef: + ArrowSchema* c_schema + shared_ptr[CDataType] c_type + + if not PyCapsule_IsValid(schema, 'arrow_schema'): + raise TypeError( + "Not an ArrowSchema object" + ) + c_schema = PyCapsule_GetPointer(schema, 'arrow_schema') + + with nogil: + c_type = GetResultValue(ImportType(c_schema)) + + return pyarrow_wrap_data_type(c_type) + cdef class DictionaryMemo(_Weakrefable): """ @@ -2369,6 +2409,46 @@ cdef class Field(_Weakrefable): result = GetResultValue(ImportField( c_ptr)) return pyarrow_wrap_field(result) + def __arrow_c_schema__(self): + """ + Export to a ArrowSchema PyCapsule + + Unlike _export_to_c, this will not leak memory if the capsule is not used. + """ + cdef ArrowSchema* c_schema + capsule = alloc_c_schema(&c_schema) + + with nogil: + check_status(ExportField(deref(self.field), c_schema)) + + return capsule + + @staticmethod + def _import_from_c_capsule(schema): + """ + Import a Field from a ArrowSchema PyCapsule + + Parameters + ---------- + schema : PyCapsule + A valid PyCapsule with name 'arrow_schema' containing an + ArrowSchema pointer. + """ + cdef: + ArrowSchema* c_schema + shared_ptr[CField] c_field + + if not PyCapsule_IsValid(schema, 'arrow_schema'): + raise ValueError( + "Not an ArrowSchema object" + ) + c_schema = PyCapsule_GetPointer(schema, 'arrow_schema') + + with nogil: + c_field = GetResultValue(ImportField(c_schema)) + + return pyarrow_wrap_field(c_field) + cdef class Schema(_Weakrefable): """ @@ -3153,6 +3233,45 @@ cdef class Schema(_Weakrefable): def __repr__(self): return self.__str__() + def __arrow_c_schema__(self): + """ + Export to a ArrowSchema PyCapsule + + Unlike _export_to_c, this will not leak memory if the capsule is not used. + """ + cdef ArrowSchema* c_schema + capsule = alloc_c_schema(&c_schema) + + with nogil: + check_status(ExportSchema(deref(self.schema), c_schema)) + + return capsule + + @staticmethod + def _import_from_c_capsule(schema): + """ + Import a Schema from a ArrowSchema PyCapsule + + Parameters + ---------- + schema : PyCapsule + A valid PyCapsule with name 'arrow_schema' containing an + ArrowSchema pointer. + """ + cdef: + ArrowSchema* c_schema + + if not PyCapsule_IsValid(schema, 'arrow_schema'): + raise ValueError( + "Not an ArrowSchema object" + ) + c_schema = PyCapsule_GetPointer(schema, 'arrow_schema') + + with nogil: + result = GetResultValue(ImportSchema(c_schema)) + + return pyarrow_wrap_schema(result) + def unify_schemas(schemas, *, promote_options="default"): """ @@ -4930,6 +5049,8 @@ def schema(fields, metadata=None): Parameters ---------- fields : iterable of Fields or tuples, or mapping of strings to DataTypes + Can also pass an object that implements the Arrow PyCapsule Protocol + for schemas (has an ``__arrow_c_schema__`` method). metadata : dict, default None Keys and values must be coercible to bytes. @@ -4969,6 +5090,8 @@ def schema(fields, metadata=None): if isinstance(fields, Mapping): fields = fields.items() + elif hasattr(fields, "__arrow_c_schema__"): + return Schema._import_from_c_capsule(fields.__arrow_c_schema__()) for item in fields: if isinstance(item, tuple): @@ -5103,3 +5226,61 @@ def _unregister_py_extension_types(): _register_py_extension_type() atexit.register(_unregister_py_extension_types) + + +# +# PyCapsule export utilities +# + +cdef void pycapsule_schema_deleter(object schema_capsule) noexcept: + cdef ArrowSchema* schema = PyCapsule_GetPointer( + schema_capsule, 'arrow_schema' + ) + if schema.release != NULL: + schema.release(schema) + + free(schema) + +cdef object alloc_c_schema(ArrowSchema** c_schema) noexcept: + c_schema[0] = malloc(sizeof(ArrowSchema)) + # Ensure the capsule destructor doesn't call a random release pointer + c_schema[0].release = NULL + return PyCapsule_New(c_schema[0], 'arrow_schema', &pycapsule_schema_deleter) + + +cdef void pycapsule_array_deleter(object array_capsule) noexcept: + cdef: + ArrowArray* array + # Do not invoke the deleter on a used/moved capsule + array = cpython.PyCapsule_GetPointer( + array_capsule, 'arrow_array' + ) + if array.release != NULL: + array.release(array) + + free(array) + +cdef object alloc_c_array(ArrowArray** c_array) noexcept: + c_array[0] = malloc(sizeof(ArrowArray)) + # Ensure the capsule destructor doesn't call a random release pointer + c_array[0].release = NULL + return PyCapsule_New(c_array[0], 'arrow_array', &pycapsule_array_deleter) + + +cdef void pycapsule_stream_deleter(object stream_capsule) noexcept: + cdef: + ArrowArrayStream* stream + # Do not invoke the deleter on a used/moved capsule + stream = PyCapsule_GetPointer( + stream_capsule, 'arrow_array_stream' + ) + if stream.release != NULL: + stream.release(stream) + + free(stream) + +cdef object alloc_c_stream(ArrowArrayStream** c_stream) noexcept: + c_stream[0] = malloc(sizeof(ArrowArrayStream)) + # Ensure the capsule destructor doesn't call a random release pointer + c_stream[0].release = NULL + return PyCapsule_New(c_stream[0], 'arrow_array_stream', &pycapsule_stream_deleter)