diff --git a/python/.gitignore b/python/.gitignore index ce7f065412728..fbc3b192433b9 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -17,6 +17,7 @@ Testing/ *.cpp pyarrow/lib.h pyarrow/*_api.h +pyarrow/_cuda.h pyarrow/_generated_version.py cython_debug diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 5c3d981c3adc7..ad05ea31c91c6 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -516,7 +516,7 @@ cdef class ChunkedArray(_PandasConvertible): cdef class _Tabular(_PandasConvertible): - pass + cdef void _assert_cpu(self) except * cdef class Table(_Tabular): diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index fff47373cb991..9bb8623665977 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -1574,6 +1574,7 @@ cdef class _Tabular(_PandasConvertible): f"one of the `{self.__class__.__name__}.from_*` functions instead.") def __array__(self, dtype=None, copy=None): + self._assert_cpu() if copy is False: raise ValueError( "Unable to avoid a copy while creating a numpy array as requested " @@ -1827,6 +1828,7 @@ cdef class _Tabular(_PandasConvertible): n_legs: [[4,100]] animals: [["Horse","Centipede"]] """ + self._assert_cpu() return _pc().drop_null(self) def field(self, i): @@ -2088,6 +2090,7 @@ cdef class _Tabular(_PandasConvertible): n_legs: [[5,100,4,2,4,2]] animal: [["Brittle stars","Centipede","Dog","Flamingo","Horse","Parrot"]] """ + self._assert_cpu() if isinstance(sorting, str): sorting = [(sorting, "ascending")] @@ -2133,6 +2136,7 @@ cdef class _Tabular(_PandasConvertible): n_legs: [[4,100]] animals: [["Horse","Centipede"]] """ + self._assert_cpu() return _pc().take(self, indices) def filter(self, mask, object null_selection_behavior="drop"): @@ -2202,6 +2206,7 @@ cdef class _Tabular(_PandasConvertible): n_legs: [[2,4,null]] animals: [["Flamingo","Horse",null]] """ + self._assert_cpu() if isinstance(mask, _pc().Expression): return _pac()._filter_table(self, mask) else: @@ -2402,6 +2407,9 @@ cdef class _Tabular(_PandasConvertible): """ return self.add_column(self.num_columns, field_, column) + cdef void _assert_cpu(self) except *: + return + cdef class RecordBatch(_Tabular): """ @@ -2512,6 +2520,7 @@ cdef class RecordBatch(_Tabular): return self.batch != NULL def __reduce__(self): + self._assert_cpu() return _reconstruct_record_batch, (self.columns, self.schema) def validate(self, *, full=False): @@ -2531,6 +2540,7 @@ cdef class RecordBatch(_Tabular): ArrowInvalid """ if full: + self._assert_cpu() with nogil: check_status(self.batch.ValidateFull()) else: @@ -2697,6 +2707,7 @@ cdef class RecordBatch(_Tabular): >>> batch.nbytes 116 """ + self._assert_cpu() cdef: CResult[int64_t] c_res_buffer @@ -2726,6 +2737,7 @@ cdef class RecordBatch(_Tabular): >>> batch.get_total_buffer_size() 120 """ + self._assert_cpu() cdef: int64_t total_buffer_size @@ -2792,12 +2804,19 @@ cdef class RecordBatch(_Tabular): shared_ptr[CRecordBatch] c_batch Field c_field Array c_arr + CDeviceAllocationType device_type = self.sp_batch.get().device_type() if isinstance(column, Array): c_arr = column else: c_arr = array(column) + if device_type != c_arr.sp_array.get().device_type(): + raise TypeError("The column must be allocated on the same " + "device as the RecordBatch. Got column on " + f"device {c_arr.device_type!r}, but expected " + f"{self.device_type!r}.") + if isinstance(field_, Field): c_field = field_ else: @@ -2885,12 +2904,19 @@ cdef class RecordBatch(_Tabular): shared_ptr[CRecordBatch] c_batch Field c_field Array c_arr + CDeviceAllocationType device_type = self.sp_batch.get().device_type() if isinstance(column, Array): c_arr = column else: c_arr = array(column) + if device_type != c_arr.sp_array.get().device_type(): + raise TypeError("The column must be allocated on the same " + "device as the RecordBatch. Got column on " + f"device {c_arr.device_type!r}, but expected " + f"{self.device_type!r}.") + if isinstance(field_, Field): c_field = field_ else: @@ -3016,6 +3042,7 @@ cdef class RecordBatch(_Tabular): n_legs: [2,2,4,4,5,100] animals: ["Flamingo","Parrot","Dog","Horse","Brittle stars","Centipede"] """ + self._assert_cpu() cdef shared_ptr[CBuffer] buffer cdef CIpcWriteOptions options = CIpcWriteOptions.Defaults() options.memory_pool = maybe_unbox_memory_pool(memory_pool) @@ -3117,6 +3144,7 @@ cdef class RecordBatch(_Tabular): >>> batch.equals(batch_1, check_metadata=True) False """ + self._assert_cpu() cdef: CRecordBatch* this_batch = self.batch shared_ptr[CRecordBatch] other_batch = pyarrow_unwrap_batch(other) @@ -3248,6 +3276,7 @@ cdef class RecordBatch(_Tabular): return RecordBatch.from_arrays(newcols, schema=target_schema) def _to_pandas(self, options, **kwargs): + self._assert_cpu() return Table.from_batches([self])._to_pandas(options, **kwargs) @classmethod @@ -3473,6 +3502,8 @@ cdef class RecordBatch(_Tabular): """ cdef: shared_ptr[CRecordBatch] c_record_batch + if struct_array.sp_array.get().device_type() != CDeviceAllocationType_kCPU: + raise NotImplementedError("Implemented only for data on CPU device") with nogil: c_record_batch = GetResultValue( CRecordBatch.FromStructArray(struct_array.sp_array)) @@ -3482,6 +3513,7 @@ cdef class RecordBatch(_Tabular): """ Convert to a struct array. """ + self._assert_cpu() cdef: shared_ptr[CRecordBatch] c_record_batch shared_ptr[CArray] c_array @@ -3560,6 +3592,7 @@ cdef class RecordBatch(_Tabular): [ 4., 40.], [nan, nan]]) """ + self._assert_cpu() cdef: shared_ptr[CRecordBatch] c_record_batch shared_ptr[CTensor] c_tensor @@ -3686,6 +3719,7 @@ cdef class RecordBatch(_Tabular): A pair of PyCapsules containing a C ArrowSchema and ArrowArray, respectively. """ + self._assert_cpu() cdef: ArrowArray* c_array ArrowSchema* c_schema @@ -3731,6 +3765,7 @@ cdef class RecordBatch(_Tabular): ------- PyCapsule """ + self._assert_cpu() return Table.from_batches([self]).__arrow_c_stream__(requested_schema) @staticmethod @@ -3943,6 +3978,10 @@ cdef class RecordBatch(_Tabular): """ return self.device_type == DeviceAllocationType.CPU + cdef void _assert_cpu(self) except *: + if self.sp_batch.get().device_type() != CDeviceAllocationType_kCPU: + raise NotImplementedError("Implemented only for data on CPU device") + def _reconstruct_record_batch(columns, schema): """ diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 3b60cff2d8cf2..5776598550536 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -27,6 +27,7 @@ import pytest import pyarrow as pa import pyarrow.compute as pc +from pyarrow.interchange import from_dataframe from pyarrow.vendored.version import Version @@ -3374,3 +3375,202 @@ def test_invalid_non_join_column(): with pytest.raises(pa.lib.ArrowInvalid) as excinfo: t2.join(t1, 'id', join_type='inner') assert exp_error_msg in str(excinfo.value) + + +@pytest.fixture +def cuda_context(): + cuda = pytest.importorskip("pyarrow.cuda") + return cuda.Context(0) + + +@pytest.fixture +def schema(): + return pa.schema([pa.field('c0', pa.int16()), pa.field('c1', pa.int32())]) + + +@pytest.fixture +def cpu_arrays(): + return [pa.array([1, 2, 3, 4, 5], pa.int16()), + pa.array([-10, -5, 0, 1, 10], pa.int32())] + + +@pytest.fixture +def cuda_arrays(cuda_context, cpu_arrays): + return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays] + + +@pytest.fixture +def cpu_recordbatch(cpu_arrays, schema): + return pa.record_batch(cpu_arrays, schema=schema) + + +@pytest.fixture +def cuda_recordbatch(cuda_context, cpu_recordbatch): + return cpu_recordbatch.copy_to(cuda_context.memory_manager) + + +def verify_cuda_recordbatch(batch, expected_schema): + batch.validate() + assert batch.device_type == pa.DeviceAllocationType.CUDA + assert batch.is_cpu is False + assert batch.num_columns == len(expected_schema.names) + assert batch.column_names == expected_schema.names + assert str(batch) in repr(batch) + for c in batch.columns: + assert c.device_type == pa.DeviceAllocationType.CUDA + assert batch.schema == expected_schema + + +def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch, + cuda_arrays, schema): + verify_cuda_recordbatch(cuda_recordbatch, expected_schema=schema) + assert cuda_recordbatch.shape == (5, 2) + + # columns() test + assert len(cuda_recordbatch.columns) == 2 + + # add_column(), set_column() test + for fn in [cuda_recordbatch.add_column, cuda_recordbatch.set_column]: + col = pa.array([6, 7, 8, 9, 10], pa.int8()).copy_to(cuda_context.memory_manager) + new_batch = fn(2, 'c2', col) + assert len(new_batch.columns) == 3 + for c in new_batch.columns: + assert c.device_type == pa.DeviceAllocationType.CUDA + err_msg = ("Got column on device , " + "but expected .") + with pytest.raises(TypeError, match=err_msg): + fn(2, 'c2', [1, 1, 1, 1, 1]) + + # remove_column() test + new_batch = cuda_recordbatch.remove_column(1) + verify_cuda_recordbatch(new_batch, expected_schema=schema.remove(1)) + + # drop_columns() test + new_batch = cuda_recordbatch.drop_columns(['c0', 'c1']) + assert len(new_batch.columns) == 0 + assert new_batch.device_type == pa.DeviceAllocationType.CUDA + + # select() test + new_batch = cuda_recordbatch.select(['c0']) + verify_cuda_recordbatch(new_batch, expected_schema=schema.remove(1)) + + # cast() test + new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1', pa.int64())]) + with pytest.raises(NotImplementedError): + cuda_recordbatch.cast(new_schema) + + # drop_null() test + null_col = pa.array([-2, -1, 0, 1, 2], + mask=[True, False, True, False, True]).copy_to( + cuda_context.memory_manager) + cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2', null_col) + with pytest.raises(NotImplementedError): + cuda_recordbatch_with_nulls.drop_null() + + # filter() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.filter([True] * 5) + + # take() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.take([0]) + + # sort_by() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.sort_by('c0') + + # field() test + assert cuda_recordbatch.field(0) == pa.field('c0', pa.int16()) + assert cuda_recordbatch.field(1) == pa.field('c1', pa.int32()) + + # equals() test + new_batch = cpu_recordbatch.copy_to(cuda_context.memory_manager) + with pytest.raises(NotImplementedError): + assert cuda_recordbatch.equals(new_batch) is True + + # from_arrays() test + new_batch = pa.RecordBatch.from_arrays(cuda_arrays, ['c0', 'c1']) + verify_cuda_recordbatch(new_batch, expected_schema=schema) + assert new_batch.copy_to(pa.default_cpu_memory_manager()).equals(cpu_recordbatch) + + # from_pydict() test + new_batch = pa.RecordBatch.from_pydict({'c0': cuda_arrays[0], 'c1': cuda_arrays[1]}) + verify_cuda_recordbatch(new_batch, expected_schema=schema) + assert new_batch.copy_to(pa.default_cpu_memory_manager()).equals(cpu_recordbatch) + + # from_struct_array() test + fields = [schema.field(i) for i in range(len(schema.names))] + struct_array = pa.StructArray.from_arrays(cuda_arrays, fields=fields) + with pytest.raises(NotImplementedError): + pa.RecordBatch.from_struct_array(struct_array) + + # nbytes test + with pytest.raises(NotImplementedError): + assert cuda_recordbatch.nbytes + + # get_total_buffer_size() test + with pytest.raises(NotImplementedError): + assert cuda_recordbatch.get_total_buffer_size() + + # to_pydict() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.to_pydict() + + # to_pylist() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.to_pylist() + + # to_pandas() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.to_pandas() + + # to_tensor() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.to_tensor() + + # to_struct_array() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.to_struct_array() + + # serialize() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.serialize() + + # slice() test + new_batch = cuda_recordbatch.slice(1, 3) + verify_cuda_recordbatch(new_batch, expected_schema=schema) + assert new_batch.num_rows == 3 + cpu_batch = new_batch.copy_to(pa.default_cpu_memory_manager()) + assert cpu_batch == cpu_recordbatch.slice(1, 3) + + # replace_schema_metadata() test + new_batch = cuda_recordbatch.replace_schema_metadata({b'key': b'value'}) + verify_cuda_recordbatch(new_batch, expected_schema=schema) + assert new_batch.schema.metadata == {b'key': b'value'} + + # rename_columns() test + new_batch = cuda_recordbatch.rename_columns(['col0', 'col1']) + expected_schema = pa.schema( + [pa.field('col0', pa.int16()), pa.field('col1', pa.int32())]) + verify_cuda_recordbatch(new_batch, expected_schema=expected_schema) + + # validate() test + cuda_recordbatch.validate() + with pytest.raises(NotImplementedError): + cuda_recordbatch.validate(full=True) + + # __array__() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.__array__() + + # __arrow_c_array__() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.__arrow_c_array__() + + # __arrow_c_stream__() test + with pytest.raises(NotImplementedError): + cuda_recordbatch.__arrow_c_stream__() + + # __dataframe__() test + with pytest.raises(NotImplementedError): + from_dataframe(cuda_recordbatch.__dataframe__())