Skip to content

Commit

Permalink
apacheGH-38607: [Python] Disable PyExtensionType autoload (apache#38608)
Browse files Browse the repository at this point in the history
### Rationale for this change

PyExtensionType autoload is really a misfeature. It creates PyArrow-specific extension types, though using ExtensionType is almost the same complexity while allowing deserialization from non-PyArrow software.

### What changes are included in this PR?

* Disable PyExtensionType autoloading and deprecate PyExtensionType instantiation.
* Update the docs to emphasize ExtensionType.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.

* Closes: apache#38607

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Raúl Cumplido <raulcumplido@gmail.com>
  • Loading branch information
pitrou authored Nov 6, 2023
1 parent 0bc75b7 commit c73cb13
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 245 deletions.
149 changes: 58 additions & 91 deletions docs/source/python/extending_types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,34 +68,43 @@ message).
See the :ref:`format_metadata_extension_types` section of the metadata
specification for more details.

Pyarrow allows you to define such extension types from Python.

There are currently two ways:

* Subclassing :class:`PyExtensionType`: the (de)serialization is based on pickle.
This is a good option for an extension type that is only used from Python.
* Subclassing :class:`ExtensionType`: this allows to give a custom
Python-independent name and serialized metadata, that can potentially be
recognized by other (non-Python) Arrow implementations such as PySpark.
Pyarrow allows you to define such extension types from Python by subclassing
:class:`ExtensionType` and giving the derived class its own extension name
and serialization mechanism. The extension name and serialized metadata
can potentially be recognized by other (non-Python) Arrow implementations
such as PySpark.

For example, we could define a custom UUID type for 128-bit numbers which can
be represented as ``FixedSizeBinary`` type with 16 bytes.
Using the first approach, we create a ``UuidType`` subclass, and implement the
``__reduce__`` method to ensure the class can be properly pickled::
be represented as ``FixedSizeBinary`` type with 16 bytes::

class UuidType(pa.PyExtensionType):
class UuidType(pa.ExtensionType):

def __init__(self):
pa.PyExtensionType.__init__(self, pa.binary(16))
super().__init__(pa.binary(16), "my_package.uuid")

def __arrow_ext_serialize__(self):
# Since we don't have a parameterized type, we don't need extra
# metadata to be deserialized
return b''

def __reduce__(self):
return UuidType, ()
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
# Sanity checks, not required but illustrate the method signature.
assert storage_type == pa.binary(16)
assert serialized == b''
# Return an instance of this subclass given the serialized
# metadata.
return UuidType()

The special methods ``__arrow_ext_serialize__`` and ``__arrow_ext_deserialize__``
define the serialization of an extension type instance. For non-parametric
types such as the above, the serialization payload can be left empty.

This can now be used to create arrays and tables holding the extension type::

>>> uuid_type = UuidType()
>>> uuid_type.extension_name
'arrow.py_extension_type'
'my_package.uuid'
>>> uuid_type.storage_type
FixedSizeBinaryType(fixed_size_binary[16])

Expand All @@ -112,8 +121,11 @@ This can now be used to create arrays and tables holding the extension type::
]

This array can be included in RecordBatches, sent over IPC and received in
another Python process. The custom UUID type will be preserved there, as long
as the definition of the class is available (the type can be unpickled).
another Python process. The receiving process must explicitly register the
extension type for deserialization, otherwise it will fall back to the
storage type::

>>> pa.register_extension_type(UuidType())

For example, creating a RecordBatch and writing it to a stream using the
IPC protocol::
Expand All @@ -129,43 +141,12 @@ and then reading it back yields the proper type::
>>> with pa.ipc.open_stream(buf) as reader:
... result = reader.read_all()
>>> result.column('ext').type
UuidType(extension<arrow.py_extension_type>)

We can define the same type using the other option::

class UuidType(pa.ExtensionType):

def __init__(self):
pa.ExtensionType.__init__(self, pa.binary(16), "my_package.uuid")

def __arrow_ext_serialize__(self):
# since we don't have a parameterized type, we don't need extra
# metadata to be deserialized
return b''

@classmethod
def __arrow_ext_deserialize__(self, storage_type, serialized):
# return an instance of this subclass given the serialized
# metadata.
return UuidType()

This is a slightly longer implementation (you need to implement the special
methods ``__arrow_ext_serialize__`` and ``__arrow_ext_deserialize__``), and the
extension type needs to be registered to be received through IPC (using
:func:`register_extension_type`), but it has
now a unique name::

>>> uuid_type = UuidType()
>>> uuid_type.extension_name
'my_package.uuid'

>>> pa.register_extension_type(uuid_type)
UuidType(FixedSizeBinaryType(fixed_size_binary[16]))

The receiving application doesn't need to be Python but can still recognize
the extension type as a "uuid" type, if it has implemented its own extension
type to receive it.
If the type is not registered in the receiving application, it will fall back
to the storage type.
the extension type as a "my_package.uuid" type, if it has implemented its own
extension type to receive it. If the type is not registered in the receiving
application, it will fall back to the storage type.

Parameterized extension type
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -187,7 +168,7 @@ of the given frequency since 1970.
# attributes need to be set first before calling
# super init (as that calls serialize)
self._freq = freq
pa.ExtensionType.__init__(self, pa.int64(), 'my_package.period')
super().__init__(pa.int64(), 'my_package.period')

@property
def freq(self):
Expand All @@ -198,7 +179,7 @@ of the given frequency since 1970.

@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
# return an instance of this subclass given the serialized
# Return an instance of this subclass given the serialized
# metadata.
serialized = serialized.decode()
assert serialized.startswith("freq=")
Expand All @@ -209,31 +190,10 @@ Here, we ensure to store all information in the serialized metadata that is
needed to reconstruct the instance (in the ``__arrow_ext_deserialize__`` class
method), in this case the frequency string.

Note that, once created, the data type instance is considered immutable. If,
in the example above, the ``freq`` parameter would change after instantiation,
the reconstruction of the type instance after IPC will be incorrect.
Note that, once created, the data type instance is considered immutable.
In the example above, the ``freq`` parameter is therefore stored in a private
attribute with a public read-only property to access it.

Parameterized extension types are also possible using the pickle-based type
subclassing :class:`PyExtensionType`. The equivalent example for the period
data type from above would look like::

class PeriodType(pa.PyExtensionType):

def __init__(self, freq):
self._freq = freq
pa.PyExtensionType.__init__(self, pa.int64())

@property
def freq(self):
return self._freq

def __reduce__(self):
return PeriodType, (self.freq,)

Also the storage type does not need to be fixed but can be parameterized.

Custom extension array class
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand All @@ -252,12 +212,16 @@ the data as a 2-D Numpy array ``(N, 3)`` without any copy::
return self.storage.flatten().to_numpy().reshape((-1, 3))


class Point3DType(pa.PyExtensionType):
class Point3DType(pa.ExtensionType):
def __init__(self):
pa.PyExtensionType.__init__(self, pa.list_(pa.float32(), 3))
super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")

def __reduce__(self):
return Point3DType, ()
def __arrow_ext_serialize__(self):
return b''

@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
return Point3DType()

def __arrow_ext_class__(self):
return Point3DArray
Expand Down Expand Up @@ -289,11 +253,8 @@ The additional methods in the extension class are then available to the user::


This array can be sent over IPC, received in another Python process, and the custom
extension array class will be preserved (as long as the definitions of the classes above
are available).

The same ``__arrow_ext_class__`` specialization can be used with custom types defined
by subclassing :class:`ExtensionType`.
extension array class will be preserved (as long as the receiving process registers
the extension type using :func:`register_extension_type` before reading the IPC data).

Custom scalar conversion
~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -304,18 +265,24 @@ If you want scalars of your custom extension type to convert to a custom type wh
For example, if we wanted the above example 3D point type to return a custom
3D point class instead of a list, we would implement::

from collections import namedtuple

Point3D = namedtuple("Point3D", ["x", "y", "z"])

class Point3DScalar(pa.ExtensionScalar):
def as_py(self) -> Point3D:
return Point3D(*self.value.as_py())

class Point3DType(pa.PyExtensionType):
class Point3DType(pa.ExtensionType):
def __init__(self):
pa.PyExtensionType.__init__(self, pa.list_(pa.float32(), 3))
super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")

def __reduce__(self):
return Point3DType, ()
def __arrow_ext_serialize__(self):
return b''

@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
return Point3DType()

def __arrow_ext_scalar_class__(self):
return Point3DScalar
Expand Down
48 changes: 40 additions & 8 deletions python/pyarrow/tests/test_cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.

import contextlib
import ctypes
import gc

Expand Down Expand Up @@ -51,18 +52,33 @@ def PyCapsule_IsValid(capsule, name):
return ctypes.pythonapi.PyCapsule_IsValid(ctypes.py_object(capsule), name) == 1


class ParamExtType(pa.PyExtensionType):
@contextlib.contextmanager
def registered_extension_type(ext_type):
pa.register_extension_type(ext_type)
try:
yield
finally:
pa.unregister_extension_type(ext_type.extension_name)


class ParamExtType(pa.ExtensionType):

def __init__(self, width):
self._width = width
pa.PyExtensionType.__init__(self, pa.binary(width))
super().__init__(pa.binary(width),
"pyarrow.tests.test_cffi.ParamExtType")

@property
def width(self):
return self._width

def __reduce__(self):
return ParamExtType, (self.width,)
def __arrow_ext_serialize__(self):
return str(self.width).encode()

@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
width = int(serialized.decode())
return cls(width)


def make_schema():
Expand All @@ -75,6 +91,12 @@ def make_extension_schema():
metadata={b'key1': b'value1'})


def make_extension_storage_schema():
# Should be kept in sync with make_extension_schema
return pa.schema([('ext', ParamExtType(3).storage_type)],
metadata={b'key1': b'value1'})


def make_batch():
return pa.record_batch([[[1], [2, 42]]], make_schema())

Expand Down Expand Up @@ -204,7 +226,10 @@ def test_export_import_array():
pa.Array._import_from_c(ptr_array, ptr_schema)


def check_export_import_schema(schema_factory):
def check_export_import_schema(schema_factory, expected_schema_factory=None):
if expected_schema_factory is None:
expected_schema_factory = schema_factory

c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))

Expand All @@ -215,7 +240,7 @@ def check_export_import_schema(schema_factory):
assert pa.total_allocated_bytes() > old_allocated
# Delete and recreate C++ object from exported pointer
schema_new = pa.Schema._import_from_c(ptr_schema)
assert schema_new == schema_factory()
assert schema_new == expected_schema_factory()
assert pa.total_allocated_bytes() == old_allocated
del schema_new
assert pa.total_allocated_bytes() == old_allocated
Expand All @@ -240,7 +265,13 @@ def test_export_import_schema():

@needs_cffi
def test_export_import_schema_with_extension():
check_export_import_schema(make_extension_schema)
# Extension type is unregistered => the storage type is imported
check_export_import_schema(make_extension_schema,
make_extension_storage_schema)

# Extension type is registered => the extension type is imported
with registered_extension_type(ParamExtType(1)):
check_export_import_schema(make_extension_schema)


@needs_cffi
Expand Down Expand Up @@ -319,7 +350,8 @@ def test_export_import_batch():

@needs_cffi
def test_export_import_batch_with_extension():
check_export_import_batch(make_extension_batch)
with registered_extension_type(ParamExtType(1)):
check_export_import_batch(make_extension_batch)


def _export_import_batch_reader(ptr_stream, reader_factory):
Expand Down
Loading

0 comments on commit c73cb13

Please sign in to comment.