Skip to content

Commit

Permalink
GH-43973: [Python] Table fails gracefully on non-cpu devices (#43974)
Browse files Browse the repository at this point in the history
## Rationale for this change

Table APIs should throw python exception instead of segfault if they don't support operating on non-cpu memory.

### What changes are included in this PR?

* Add is_cpu() property to Table
* Add _assert_cpu() checks to Table APIs that only support operating on cpu memory

### Are these changes tested?

* Unit tests

### Are there any user-facing changes?

No, besides receiving a friendlier error in certain scenarios.
* GitHub Issue: #43973

Lead-authored-by: Dane Pitkin <dpitkin@apache.org>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
danepitkin and jorisvandenbossche authored Sep 11, 2024
1 parent 170ea04 commit 21f5968
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 11 deletions.
2 changes: 2 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,8 @@ cdef class Table(_Tabular):
cdef:
shared_ptr[CTable] sp_table
CTable* table
c_bool _is_cpu
c_bool _init_is_cpu

cdef void init(self, const shared_ptr[CTable]& table)

Expand Down
30 changes: 30 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -4180,6 +4180,7 @@ cdef class Table(_Tabular):

def __cinit__(self):
self.table = NULL
self._init_is_cpu = False

cdef void init(self, const shared_ptr[CTable]& table):
self.sp_table = table
Expand All @@ -4205,6 +4206,7 @@ cdef class Table(_Tabular):
ArrowInvalid
"""
if full:
self._assert_cpu()
with nogil:
check_status(self.table.ValidateFull())
else:
Expand All @@ -4214,6 +4216,7 @@ cdef class Table(_Tabular):
def __reduce__(self):
# Reduce the columns as ChunkedArrays to avoid serializing schema
# data twice
self._assert_cpu()
columns = [col for col in self.columns]
return _reconstruct_table, (columns, self.schema)

Expand Down Expand Up @@ -4452,6 +4455,7 @@ cdef class Table(_Tabular):
a.year: [[null,2022]]
month: [[4,6]]
"""
self._assert_cpu()
cdef:
shared_ptr[CTable] flattened
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
Expand Down Expand Up @@ -4499,6 +4503,7 @@ cdef class Table(_Tabular):
n_legs: [[2,2,4,4,5,100]]
animals: [["Flamingo","Parrot","Dog","Horse","Brittle stars","Centipede"]]
"""
self._assert_cpu()
cdef:
shared_ptr[CTable] combined
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
Expand Down Expand Up @@ -4556,6 +4561,7 @@ cdef class Table(_Tabular):
["Flamingo","Parrot","Dog","Horse","Brittle stars","Centipede"] -- indices:
[3,4,5]]
"""
self._assert_cpu()
cdef:
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
shared_ptr[CTable] c_result
Expand Down Expand Up @@ -4601,6 +4607,7 @@ cdef class Table(_Tabular):
>>> table.equals(table_1, check_metadata=True)
False
"""
self._assert_cpu()
if other is None:
return False

Expand Down Expand Up @@ -4658,6 +4665,7 @@ cdef class Table(_Tabular):
n_legs: [[2,4,5,100]]
animals: [["Flamingo","Horse","Brittle stars","Centipede"]]
"""
self._assert_cpu()
cdef:
ChunkedArray column, casted
Field field
Expand Down Expand Up @@ -4909,6 +4917,7 @@ cdef class Table(_Tabular):
-------
ChunkedArray
"""
self._assert_cpu()
return chunked_array([
batch.to_struct_array()
for batch in self.to_batches(max_chunksize=max_chunksize)
Expand Down Expand Up @@ -5118,6 +5127,7 @@ cdef class Table(_Tabular):

def _to_pandas(self, options, categories=None, ignore_metadata=False,
types_mapper=None):
self._assert_cpu()
from pyarrow.pandas_compat import table_to_dataframe
df = table_to_dataframe(
options, self, categories,
Expand Down Expand Up @@ -5239,6 +5249,7 @@ cdef class Table(_Tabular):
>>> table.nbytes
72
"""
self._assert_cpu()
cdef:
CResult[int64_t] c_res_buffer

Expand Down Expand Up @@ -5268,6 +5279,7 @@ cdef class Table(_Tabular):
>>> table.get_total_buffer_size()
76
"""
self._assert_cpu()
cdef:
int64_t total_buffer_size

Expand Down Expand Up @@ -5576,6 +5588,7 @@ cdef class Table(_Tabular):
year: [[2020,2022,2021,2019]]
n_legs_sum: [[2,6,104,5]]
"""
self._assert_cpu()
return TableGroupBy(self, keys, use_threads=use_threads)

def join(self, right_table, keys, right_keys=None, join_type="left outer",
Expand Down Expand Up @@ -5685,6 +5698,7 @@ cdef class Table(_Tabular):
n_legs: [[100]]
animal: [["Centipede"]]
"""
self._assert_cpu()
if right_keys is None:
right_keys = keys
return _pac()._perform_join(
Expand Down Expand Up @@ -5772,6 +5786,7 @@ cdef class Table(_Tabular):
n_legs: [[null,5,null,5,null]]
animal: [[null,"Brittle stars",null,"Brittle stars",null]]
"""
self._assert_cpu()
if right_on is None:
right_on = on
if right_by is None:
Expand All @@ -5797,8 +5812,23 @@ cdef class Table(_Tabular):
-------
PyCapsule
"""
self._assert_cpu()
return self.to_reader().__arrow_c_stream__(requested_schema)

@property
def is_cpu(self):
"""
Whether all ChunkedArrays are CPU-accessible.
"""
if not self._init_is_cpu:
self._is_cpu = all(c.is_cpu for c in self.itercolumns())
self._init_is_cpu = True
return self._is_cpu

cdef void _assert_cpu(self) except *:
if not self.is_cpu:
raise NotImplementedError("Implemented only for data on CPU device")


def _reconstruct_table(arrays, schema):
"""
Expand Down
Loading

0 comments on commit 21f5968

Please sign in to comment.