Skip to content

Commit

Permalink
[Python] Array gracefully fails on non-cpu device
Browse files Browse the repository at this point in the history
  • Loading branch information
danepitkin committed Jun 11, 2024
1 parent 03a960d commit 021c6df
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 8 deletions.
74 changes: 66 additions & 8 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,7 @@ cdef class Array(_PandasConvertible):
+"two-and-a-half"
"""
self._assert_cpu()
cdef c_string result
with nogil:
result = self.ap.Diff(deref(other.ap))
Expand All @@ -982,6 +983,7 @@ cdef class Array(_PandasConvertible):
-------
cast : Array
"""
self._assert_cpu()
return _pc().cast(self, target_type, safe=safe,
options=options, memory_pool=memory_pool)

Expand All @@ -1000,6 +1002,7 @@ cdef class Array(_PandasConvertible):
-------
view : Array
"""
self._assert_cpu()
cdef DataType type = ensure_type(target_type)
cdef shared_ptr[CArray] result
with nogil:
Expand All @@ -1022,6 +1025,7 @@ cdef class Array(_PandasConvertible):
sum : Scalar
A scalar containing the sum value.
"""
self._assert_cpu()
options = _pc().ScalarAggregateOptions(**kwargs)
return _pc().call_function('sum', [self], options)

Expand All @@ -1034,6 +1038,7 @@ cdef class Array(_PandasConvertible):
unique : Array
An array of the same data type, with deduplicated elements.
"""
self._assert_cpu()
return _pc().call_function('unique', [self])

def dictionary_encode(self, null_encoding='mask'):
Expand All @@ -1052,6 +1057,7 @@ cdef class Array(_PandasConvertible):
encoded : DictionaryArray
A dictionary-encoded version of this array.
"""
self._assert_cpu()
options = _pc().DictionaryEncodeOptions(null_encoding)
return _pc().call_function('dictionary_encode', [self], options)

Expand All @@ -1064,6 +1070,7 @@ cdef class Array(_PandasConvertible):
StructArray
An array of <input type "Values", int64 "Counts"> structs
"""
self._assert_cpu()
return _pc().call_function('value_counts', [self])

@staticmethod
Expand Down Expand Up @@ -1101,10 +1108,12 @@ cdef class Array(_PandasConvertible):
array : pyarrow.Array or pyarrow.ChunkedArray
ChunkedArray is returned if object data overflows binary buffer.
"""
self._assert_cpu()
return array(obj, mask=mask, type=type, safe=safe, from_pandas=True,
memory_pool=memory_pool)

def __reduce__(self):
self._assert_cpu()
return _restore_array, \
(_reduce_array_data(self.sp_array.get().data().get()),)

Expand Down Expand Up @@ -1172,6 +1181,7 @@ cdef class Array(_PandasConvertible):

@property
def null_count(self):
self._assert_cpu()
return self.sp_array.get().null_count()

@property
Expand All @@ -1191,9 +1201,8 @@ cdef class Array(_PandasConvertible):
The dictionary of dictionary arrays will always be counted in their
entirety even if the array only references a portion of the dictionary.
"""
cdef:
CResult[int64_t] c_size_res

self._assert_cpu()
cdef CResult[int64_t] c_size_res
with nogil:
c_size_res = ReferencedBufferSize(deref(self.ap))
size = GetResultValue(c_size_res)
Expand All @@ -1210,20 +1219,22 @@ cdef class Array(_PandasConvertible):
If a buffer is referenced multiple times then it will
only be counted once.
"""
cdef:
int64_t total_buffer_size

self._assert_cpu()
cdef int64_t total_buffer_size
total_buffer_size = TotalBufferSize(deref(self.ap))
return total_buffer_size

def __sizeof__(self):
self._assert_cpu()
return super(Array, self).__sizeof__() + self.nbytes

def __iter__(self):
self._assert_cpu()
for i in range(len(self)):
yield self.getitem(i)

def __repr__(self):
self._assert_cpu()
type_format = object.__repr__(self)
return '{0}\n{1}'.format(type_format, str(self))

Expand Down Expand Up @@ -1252,6 +1263,8 @@ cdef class Array(_PandasConvertible):
If the array should be rendered as a single line of text
or if each element should be on its own line.
"""
self._assert_cpu()

cdef:
c_string result
PrettyPrintOptions options
Expand Down Expand Up @@ -1287,6 +1300,7 @@ cdef class Array(_PandasConvertible):
return self.to_string(**kwargs)

def __str__(self):
self._assert_cpu()
return self.to_string()

def __eq__(self, other):
Expand All @@ -1307,6 +1321,8 @@ cdef class Array(_PandasConvertible):
-------
bool
"""
self._assert_cpu()
self.other._assert_cpu()
return self.ap.Equals(deref(other.ap))

def __len__(self):
Expand All @@ -1331,6 +1347,7 @@ cdef class Array(_PandasConvertible):
-------
array : boolean Array
"""
self._assert_cpu()
options = _pc().NullOptions(nan_is_null=nan_is_null)
return _pc().call_function('is_null', [self], options)

Expand All @@ -1342,12 +1359,14 @@ cdef class Array(_PandasConvertible):
-------
array : boolean Array
"""
self._assert_cpu()
return _pc().call_function('is_nan', [self])

def is_valid(self):
"""
Return BooleanArray indicating the non-null values.
"""
self._assert_cpu()
return _pc().is_valid(self)

def fill_null(self, fill_value):
Expand All @@ -1364,6 +1383,7 @@ cdef class Array(_PandasConvertible):
result : Array
A new array with nulls replaced by the given value.
"""
self._assert_cpu()
return _pc().fill_null(self, fill_value)

def __getitem__(self, key):
Expand All @@ -1380,12 +1400,14 @@ cdef class Array(_PandasConvertible):
-------
value : Scalar (index) or Array (slice)
"""
self._assert_cpu()
if isinstance(key, slice):
return _normalize_slice(self, key)

return self.getitem(_normalize_index(key, self.length()))

cdef getitem(self, int64_t i):
self._assert_cpu()
return Scalar.wrap(GetResultValue(self.ap.GetScalar(i)))

def slice(self, offset=0, length=None):
Expand All @@ -1404,8 +1426,9 @@ cdef class Array(_PandasConvertible):
-------
sliced : RecordBatch
"""
cdef:
shared_ptr[CArray] result
self._assert_cpu()

cdef shared_ptr[CArray] result

if offset < 0:
raise IndexError('Offset must be non-negative')
Expand Down Expand Up @@ -1436,12 +1459,14 @@ cdef class Array(_PandasConvertible):
taken : Array
An array with the same datatype, containing the taken values.
"""
self._assert_cpu()
return _pc().take(self, indices)

def drop_null(self):
"""
Remove missing values from an array.
"""
self._assert_cpu()
return _pc().drop_null(self)

def filter(self, object mask, *, null_selection_behavior='drop'):
Expand All @@ -1463,6 +1488,7 @@ cdef class Array(_PandasConvertible):
An array of the same type, with only the elements selected by
the boolean mask.
"""
self._assert_cpu()
return _pc().filter(self, mask,
null_selection_behavior=null_selection_behavior)

Expand All @@ -1488,6 +1514,7 @@ cdef class Array(_PandasConvertible):
index : Int64Scalar
The index of the value in the array (-1 if not found).
"""
self._assert_cpu()
return _pc().index(self, value, start, end, memory_pool=memory_pool)

def sort(self, order="ascending", **kwargs):
Expand All @@ -1507,16 +1534,20 @@ cdef class Array(_PandasConvertible):
-------
result : Array
"""
self._assert_cpu()
indices = _pc().sort_indices(
self,
options=_pc().SortOptions(sort_keys=[("", order)], **kwargs)
)
return self.take(indices)

def _to_pandas(self, options, types_mapper=None, **kwargs):
self._assert_cpu()
return _array_like_to_pandas(self, options, types_mapper=types_mapper)

def __array__(self, dtype=None, copy=None):
self._assert_cpu()

if copy is False:
try:
values = self.to_numpy(zero_copy_only=True)
Expand Down Expand Up @@ -1566,6 +1597,8 @@ cdef class Array(_PandasConvertible):
-------
array : numpy.ndarray
"""
self._assert_cpu()

cdef:
PyObject* out
PandasOptions c_options
Expand Down Expand Up @@ -1604,6 +1637,7 @@ cdef class Array(_PandasConvertible):
-------
lst : list
"""
self._assert_cpu()
return [x.as_py() for x in self]

def tolist(self):
Expand Down Expand Up @@ -1737,6 +1771,8 @@ cdef class Array(_PandasConvertible):
A pair of PyCapsules containing a C ArrowSchema and ArrowArray,
respectively.
"""
self._assert_cpu()

cdef:
ArrowArray* c_array
ArrowSchema* c_schema
Expand Down Expand Up @@ -1883,6 +1919,28 @@ cdef class Array(_PandasConvertible):
device = GetResultValue(ExportDevice(self.sp_array))
return device.device_type, device.device_id

@property
def device_type(self):
"""
The device type where the array resides.
Returns
-------
DeviceAllocationType
"""
return _wrap_device_allocation_type(self.sp_array.get().device_type())

@property
def is_cpu(self):
"""
Whether the array is CPU-accessible.
"""
return self.device_type == DeviceAllocationType.CPU

def _assert_cpu(self):
if not self.is_cpu:
raise NotImplementedError("Implemented only for data on CPU device")


cdef _array_like_to_pandas(obj, options, types_mapper):
cdef:
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CStatus Validate() const
CStatus ValidateFull() const
CResult[shared_ptr[CArray]] View(const shared_ptr[CDataType]& type)
CDeviceAllocationType device_type()

shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data)
CResult[shared_ptr[CArray]] MakeArrayOfNull(
Expand Down
Loading

0 comments on commit 021c6df

Please sign in to comment.