Skip to content

Commit

Permalink
apacheGH-43727: [Python] RecordBatch fails gracefully on non-cpu devi…
Browse files Browse the repository at this point in the history
…ces (apache#43729)

### Rationale for this change

Throw a python exception if a RecordBatch API isn't able to be used when the memory is backed by non-cpu devices.

### What changes are included in this PR?

* Assert the device is CPU for APIs that only support CPU

### Are these changes tested?

Pytests

### Are there any user-facing changes?

The user experiences Python exceptions instead of segfaults for unsupported APIs.
* GitHub Issue: apache#43727

Authored-by: Dane Pitkin <dpitkin@apache.org>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
danepitkin authored and zanmato1984 committed Sep 6, 2024
1 parent 86c84df commit 8b66363
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 1 deletion.
1 change: 1 addition & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Testing/
*.cpp
pyarrow/lib.h
pyarrow/*_api.h
pyarrow/_cuda.h
pyarrow/_generated_version.py
cython_debug

Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ cdef class ChunkedArray(_PandasConvertible):


cdef class _Tabular(_PandasConvertible):
pass
cdef void _assert_cpu(self) except *


cdef class Table(_Tabular):
Expand Down
39 changes: 39 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,7 @@ cdef class _Tabular(_PandasConvertible):
f"one of the `{self.__class__.__name__}.from_*` functions instead.")

def __array__(self, dtype=None, copy=None):
self._assert_cpu()
if copy is False:
raise ValueError(
"Unable to avoid a copy while creating a numpy array as requested "
Expand Down Expand Up @@ -1827,6 +1828,7 @@ cdef class _Tabular(_PandasConvertible):
n_legs: [[4,100]]
animals: [["Horse","Centipede"]]
"""
self._assert_cpu()
return _pc().drop_null(self)

def field(self, i):
Expand Down Expand Up @@ -2088,6 +2090,7 @@ cdef class _Tabular(_PandasConvertible):
n_legs: [[5,100,4,2,4,2]]
animal: [["Brittle stars","Centipede","Dog","Flamingo","Horse","Parrot"]]
"""
self._assert_cpu()
if isinstance(sorting, str):
sorting = [(sorting, "ascending")]

Expand Down Expand Up @@ -2133,6 +2136,7 @@ cdef class _Tabular(_PandasConvertible):
n_legs: [[4,100]]
animals: [["Horse","Centipede"]]
"""
self._assert_cpu()
return _pc().take(self, indices)

def filter(self, mask, object null_selection_behavior="drop"):
Expand Down Expand Up @@ -2202,6 +2206,7 @@ cdef class _Tabular(_PandasConvertible):
n_legs: [[2,4,null]]
animals: [["Flamingo","Horse",null]]
"""
self._assert_cpu()
if isinstance(mask, _pc().Expression):
return _pac()._filter_table(self, mask)
else:
Expand Down Expand Up @@ -2402,6 +2407,9 @@ cdef class _Tabular(_PandasConvertible):
"""
return self.add_column(self.num_columns, field_, column)

cdef void _assert_cpu(self) except *:
return


cdef class RecordBatch(_Tabular):
"""
Expand Down Expand Up @@ -2512,6 +2520,7 @@ cdef class RecordBatch(_Tabular):
return self.batch != NULL

def __reduce__(self):
self._assert_cpu()
return _reconstruct_record_batch, (self.columns, self.schema)

def validate(self, *, full=False):
Expand All @@ -2531,6 +2540,7 @@ cdef class RecordBatch(_Tabular):
ArrowInvalid
"""
if full:
self._assert_cpu()
with nogil:
check_status(self.batch.ValidateFull())
else:
Expand Down Expand Up @@ -2697,6 +2707,7 @@ cdef class RecordBatch(_Tabular):
>>> batch.nbytes
116
"""
self._assert_cpu()
cdef:
CResult[int64_t] c_res_buffer

Expand Down Expand Up @@ -2726,6 +2737,7 @@ cdef class RecordBatch(_Tabular):
>>> batch.get_total_buffer_size()
120
"""
self._assert_cpu()
cdef:
int64_t total_buffer_size

Expand Down Expand Up @@ -2792,12 +2804,19 @@ cdef class RecordBatch(_Tabular):
shared_ptr[CRecordBatch] c_batch
Field c_field
Array c_arr
CDeviceAllocationType device_type = self.sp_batch.get().device_type()

if isinstance(column, Array):
c_arr = column
else:
c_arr = array(column)

if device_type != c_arr.sp_array.get().device_type():
raise TypeError("The column must be allocated on the same "
"device as the RecordBatch. Got column on "
f"device {c_arr.device_type!r}, but expected "
f"{self.device_type!r}.")

if isinstance(field_, Field):
c_field = field_
else:
Expand Down Expand Up @@ -2885,12 +2904,19 @@ cdef class RecordBatch(_Tabular):
shared_ptr[CRecordBatch] c_batch
Field c_field
Array c_arr
CDeviceAllocationType device_type = self.sp_batch.get().device_type()

if isinstance(column, Array):
c_arr = column
else:
c_arr = array(column)

if device_type != c_arr.sp_array.get().device_type():
raise TypeError("The column must be allocated on the same "
"device as the RecordBatch. Got column on "
f"device {c_arr.device_type!r}, but expected "
f"{self.device_type!r}.")

if isinstance(field_, Field):
c_field = field_
else:
Expand Down Expand Up @@ -3016,6 +3042,7 @@ cdef class RecordBatch(_Tabular):
n_legs: [2,2,4,4,5,100]
animals: ["Flamingo","Parrot","Dog","Horse","Brittle stars","Centipede"]
"""
self._assert_cpu()
cdef shared_ptr[CBuffer] buffer
cdef CIpcWriteOptions options = CIpcWriteOptions.Defaults()
options.memory_pool = maybe_unbox_memory_pool(memory_pool)
Expand Down Expand Up @@ -3117,6 +3144,7 @@ cdef class RecordBatch(_Tabular):
>>> batch.equals(batch_1, check_metadata=True)
False
"""
self._assert_cpu()
cdef:
CRecordBatch* this_batch = self.batch
shared_ptr[CRecordBatch] other_batch = pyarrow_unwrap_batch(other)
Expand Down Expand Up @@ -3248,6 +3276,7 @@ cdef class RecordBatch(_Tabular):
return RecordBatch.from_arrays(newcols, schema=target_schema)

def _to_pandas(self, options, **kwargs):
self._assert_cpu()
return Table.from_batches([self])._to_pandas(options, **kwargs)

@classmethod
Expand Down Expand Up @@ -3473,6 +3502,8 @@ cdef class RecordBatch(_Tabular):
"""
cdef:
shared_ptr[CRecordBatch] c_record_batch
if struct_array.sp_array.get().device_type() != CDeviceAllocationType_kCPU:
raise NotImplementedError("Implemented only for data on CPU device")
with nogil:
c_record_batch = GetResultValue(
CRecordBatch.FromStructArray(struct_array.sp_array))
Expand All @@ -3482,6 +3513,7 @@ cdef class RecordBatch(_Tabular):
"""
Convert to a struct array.
"""
self._assert_cpu()
cdef:
shared_ptr[CRecordBatch] c_record_batch
shared_ptr[CArray] c_array
Expand Down Expand Up @@ -3560,6 +3592,7 @@ cdef class RecordBatch(_Tabular):
[ 4., 40.],
[nan, nan]])
"""
self._assert_cpu()
cdef:
shared_ptr[CRecordBatch] c_record_batch
shared_ptr[CTensor] c_tensor
Expand Down Expand Up @@ -3686,6 +3719,7 @@ cdef class RecordBatch(_Tabular):
A pair of PyCapsules containing a C ArrowSchema and ArrowArray,
respectively.
"""
self._assert_cpu()
cdef:
ArrowArray* c_array
ArrowSchema* c_schema
Expand Down Expand Up @@ -3731,6 +3765,7 @@ cdef class RecordBatch(_Tabular):
-------
PyCapsule
"""
self._assert_cpu()
return Table.from_batches([self]).__arrow_c_stream__(requested_schema)

@staticmethod
Expand Down Expand Up @@ -3943,6 +3978,10 @@ cdef class RecordBatch(_Tabular):
"""
return self.device_type == DeviceAllocationType.CPU

cdef void _assert_cpu(self) except *:
if self.sp_batch.get().device_type() != CDeviceAllocationType_kCPU:
raise NotImplementedError("Implemented only for data on CPU device")


def _reconstruct_record_batch(columns, schema):
"""
Expand Down
Loading

0 comments on commit 8b66363

Please sign in to comment.