Skip to content

Commit

Permalink
apacheGH-42112: [Python] Array gracefully fails on non-cpu device (ap…
Browse files Browse the repository at this point in the history
…ache#42113)

### Rationale for this change

Common `Array` APIs should not segfault or abort on non-cpu devices.

### What changes are included in this PR?

* `device_type` and `is_cpu` methods added to the `Array` class
* Any function that segfaults, aborts, or gives incorrect results on non-cpu devices now raises an exception 

### Are these changes tested?

* Unit tests added

### Are there any user-facing changes?

* `device_type` and `is_cpu` methods added to the `Array` class
* GitHub Issue: apache#42112

Lead-authored-by: Dane Pitkin <dpitkin@apache.org>
Co-authored-by: Dane Pitkin <dpitkin.oss@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
3 people authored and zanmato1984 committed Jul 9, 2024
1 parent b8b0250 commit 3b4b175
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 8 deletions.
70 changes: 62 additions & 8 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,7 @@ cdef class Array(_PandasConvertible):
+"two-and-a-half"
"""
self._assert_cpu()
cdef c_string result
with nogil:
result = self.ap.Diff(deref(other.ap))
Expand All @@ -982,6 +983,7 @@ cdef class Array(_PandasConvertible):
-------
cast : Array
"""
self._assert_cpu()
return _pc().cast(self, target_type, safe=safe,
options=options, memory_pool=memory_pool)

Expand All @@ -1000,6 +1002,7 @@ cdef class Array(_PandasConvertible):
-------
view : Array
"""
self._assert_cpu()
cdef DataType type = ensure_type(target_type)
cdef shared_ptr[CArray] result
with nogil:
Expand All @@ -1022,6 +1025,7 @@ cdef class Array(_PandasConvertible):
sum : Scalar
A scalar containing the sum value.
"""
self._assert_cpu()
options = _pc().ScalarAggregateOptions(**kwargs)
return _pc().call_function('sum', [self], options)

Expand All @@ -1034,6 +1038,7 @@ cdef class Array(_PandasConvertible):
unique : Array
An array of the same data type, with deduplicated elements.
"""
self._assert_cpu()
return _pc().call_function('unique', [self])

def dictionary_encode(self, null_encoding='mask'):
Expand All @@ -1052,6 +1057,7 @@ cdef class Array(_PandasConvertible):
encoded : DictionaryArray
A dictionary-encoded version of this array.
"""
self._assert_cpu()
options = _pc().DictionaryEncodeOptions(null_encoding)
return _pc().call_function('dictionary_encode', [self], options)

Expand All @@ -1064,6 +1070,7 @@ cdef class Array(_PandasConvertible):
StructArray
An array of <input type "Values", int64 "Counts"> structs
"""
self._assert_cpu()
return _pc().call_function('value_counts', [self])

@staticmethod
Expand Down Expand Up @@ -1105,6 +1112,7 @@ cdef class Array(_PandasConvertible):
memory_pool=memory_pool)

def __reduce__(self):
self._assert_cpu()
return _restore_array, \
(_reduce_array_data(self.sp_array.get().data().get()),)

Expand Down Expand Up @@ -1172,6 +1180,7 @@ cdef class Array(_PandasConvertible):

@property
def null_count(self):
self._assert_cpu()
return self.sp_array.get().null_count()

@property
Expand All @@ -1191,9 +1200,8 @@ cdef class Array(_PandasConvertible):
The dictionary of dictionary arrays will always be counted in their
entirety even if the array only references a portion of the dictionary.
"""
cdef:
CResult[int64_t] c_size_res

self._assert_cpu()
cdef CResult[int64_t] c_size_res
with nogil:
c_size_res = ReferencedBufferSize(deref(self.ap))
size = GetResultValue(c_size_res)
Expand All @@ -1210,16 +1218,17 @@ cdef class Array(_PandasConvertible):
If a buffer is referenced multiple times then it will
only be counted once.
"""
cdef:
int64_t total_buffer_size

self._assert_cpu()
cdef int64_t total_buffer_size
total_buffer_size = TotalBufferSize(deref(self.ap))
return total_buffer_size

def __sizeof__(self):
self._assert_cpu()
return super(Array, self).__sizeof__() + self.nbytes

def __iter__(self):
self._assert_cpu()
for i in range(len(self)):
yield self.getitem(i)

Expand Down Expand Up @@ -1252,6 +1261,8 @@ cdef class Array(_PandasConvertible):
If the array should be rendered as a single line of text
or if each element should be on its own line.
"""
self._assert_cpu()

cdef:
c_string result
PrettyPrintOptions options
Expand Down Expand Up @@ -1307,6 +1318,8 @@ cdef class Array(_PandasConvertible):
-------
bool
"""
self._assert_cpu()
other._assert_cpu()
return self.ap.Equals(deref(other.ap))

def __len__(self):
Expand All @@ -1331,6 +1344,7 @@ cdef class Array(_PandasConvertible):
-------
array : boolean Array
"""
self._assert_cpu()
options = _pc().NullOptions(nan_is_null=nan_is_null)
return _pc().call_function('is_null', [self], options)

Expand All @@ -1342,12 +1356,14 @@ cdef class Array(_PandasConvertible):
-------
array : boolean Array
"""
self._assert_cpu()
return _pc().call_function('is_nan', [self])

def is_valid(self):
"""
Return BooleanArray indicating the non-null values.
"""
self._assert_cpu()
return _pc().is_valid(self)

def fill_null(self, fill_value):
Expand All @@ -1364,6 +1380,7 @@ cdef class Array(_PandasConvertible):
result : Array
A new array with nulls replaced by the given value.
"""
self._assert_cpu()
return _pc().fill_null(self, fill_value)

def __getitem__(self, key):
Expand All @@ -1380,12 +1397,14 @@ cdef class Array(_PandasConvertible):
-------
value : Scalar (index) or Array (slice)
"""
self._assert_cpu()
if isinstance(key, slice):
return _normalize_slice(self, key)

return self.getitem(_normalize_index(key, self.length()))

cdef getitem(self, int64_t i):
self._assert_cpu()
return Scalar.wrap(GetResultValue(self.ap.GetScalar(i)))

def slice(self, offset=0, length=None):
Expand All @@ -1404,8 +1423,7 @@ cdef class Array(_PandasConvertible):
-------
sliced : RecordBatch
"""
cdef:
shared_ptr[CArray] result
cdef shared_ptr[CArray] result

if offset < 0:
raise IndexError('Offset must be non-negative')
Expand Down Expand Up @@ -1436,12 +1454,14 @@ cdef class Array(_PandasConvertible):
taken : Array
An array with the same datatype, containing the taken values.
"""
self._assert_cpu()
return _pc().take(self, indices)

def drop_null(self):
"""
Remove missing values from an array.
"""
self._assert_cpu()
return _pc().drop_null(self)

def filter(self, object mask, *, null_selection_behavior='drop'):
Expand All @@ -1463,6 +1483,7 @@ cdef class Array(_PandasConvertible):
An array of the same type, with only the elements selected by
the boolean mask.
"""
self._assert_cpu()
return _pc().filter(self, mask,
null_selection_behavior=null_selection_behavior)

Expand All @@ -1488,6 +1509,7 @@ cdef class Array(_PandasConvertible):
index : Int64Scalar
The index of the value in the array (-1 if not found).
"""
self._assert_cpu()
return _pc().index(self, value, start, end, memory_pool=memory_pool)

def sort(self, order="ascending", **kwargs):
Expand All @@ -1507,16 +1529,20 @@ cdef class Array(_PandasConvertible):
-------
result : Array
"""
self._assert_cpu()
indices = _pc().sort_indices(
self,
options=_pc().SortOptions(sort_keys=[("", order)], **kwargs)
)
return self.take(indices)

def _to_pandas(self, options, types_mapper=None, **kwargs):
self._assert_cpu()
return _array_like_to_pandas(self, options, types_mapper=types_mapper)

def __array__(self, dtype=None, copy=None):
self._assert_cpu()

if copy is False:
try:
values = self.to_numpy(zero_copy_only=True)
Expand Down Expand Up @@ -1566,6 +1592,8 @@ cdef class Array(_PandasConvertible):
-------
array : numpy.ndarray
"""
self._assert_cpu()

cdef:
PyObject* out
PandasOptions c_options
Expand Down Expand Up @@ -1604,6 +1632,7 @@ cdef class Array(_PandasConvertible):
-------
lst : list
"""
self._assert_cpu()
return [x.as_py() for x in self]

def tolist(self):
Expand All @@ -1629,6 +1658,7 @@ cdef class Array(_PandasConvertible):
ArrowInvalid
"""
if full:
self._assert_cpu()
with nogil:
check_status(self.ap.ValidateFull())
else:
Expand Down Expand Up @@ -1737,6 +1767,8 @@ cdef class Array(_PandasConvertible):
A pair of PyCapsules containing a C ArrowSchema and ArrowArray,
respectively.
"""
self._assert_cpu()

cdef:
ArrowArray* c_array
ArrowSchema* c_schema
Expand Down Expand Up @@ -1885,6 +1917,28 @@ cdef class Array(_PandasConvertible):
device = GetResultValue(ExportDevice(self.sp_array))
return device.device_type, device.device_id

@property
def device_type(self):
"""
The device type where the array resides.
Returns
-------
DeviceAllocationType
"""
return _wrap_device_allocation_type(self.sp_array.get().device_type())

@property
def is_cpu(self):
"""
Whether the array is CPU-accessible.
"""
return self.device_type == DeviceAllocationType.CPU

cdef void _assert_cpu(self) except *:
if self.sp_array.get().device_type() != CDeviceAllocationType_kCPU:
raise NotImplementedError("Implemented only for data on CPU device")


cdef _array_like_to_pandas(obj, options, types_mapper):
cdef:
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CStatus Validate() const
CStatus ValidateFull() const
CResult[shared_ptr[CArray]] View(const shared_ptr[CDataType]& type)
CDeviceAllocationType device_type()

shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data)
CResult[shared_ptr[CArray]] MakeArrayOfNull(
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ cdef class Array(_PandasConvertible):
cdef void init(self, const shared_ptr[CArray]& sp_array) except *
cdef getitem(self, int64_t i)
cdef int64_t length(self)
cdef void _assert_cpu(self) except *


cdef class Tensor(_Weakrefable):
Expand Down
Loading

0 comments on commit 3b4b175

Please sign in to comment.