Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
danepitkin committed Sep 6, 2024
1 parent 8d7ece6 commit 6cca473
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 15 deletions.
16 changes: 16 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -4206,6 +4206,7 @@ cdef class Table(_Tabular):
ArrowInvalid
"""
if full:
self._assert_cpu()
with nogil:
check_status(self.table.ValidateFull())
else:
Expand All @@ -4215,6 +4216,7 @@ cdef class Table(_Tabular):
def __reduce__(self):
# Reduce the columns as ChunkedArrays to avoid serializing schema
# data twice
self._assert_cpu()
columns = [col for col in self.columns]
return _reconstruct_table, (columns, self.schema)

Expand Down Expand Up @@ -4453,6 +4455,7 @@ cdef class Table(_Tabular):
a.year: [[null,2022]]
month: [[4,6]]
"""
self._assert_cpu()
cdef:
shared_ptr[CTable] flattened
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
Expand Down Expand Up @@ -4500,6 +4503,7 @@ cdef class Table(_Tabular):
n_legs: [[2,2,4,4,5,100]]
animals: [["Flamingo","Parrot","Dog","Horse","Brittle stars","Centipede"]]
"""
self._assert_cpu()
cdef:
shared_ptr[CTable] combined
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
Expand Down Expand Up @@ -4557,6 +4561,7 @@ cdef class Table(_Tabular):
["Flamingo","Parrot","Dog","Horse","Brittle stars","Centipede"] -- indices:
[3,4,5]]
"""
self._assert_cpu()
cdef:
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
shared_ptr[CTable] c_result
Expand Down Expand Up @@ -4602,6 +4607,7 @@ cdef class Table(_Tabular):
>>> table.equals(table_1, check_metadata=True)
False
"""
self._assert_cpu()
if other is None:
return False

Expand Down Expand Up @@ -4659,6 +4665,7 @@ cdef class Table(_Tabular):
n_legs: [[2,4,5,100]]
animals: [["Flamingo","Horse","Brittle stars","Centipede"]]
"""
self._assert_cpu()
cdef:
ChunkedArray column, casted
Field field
Expand Down Expand Up @@ -4910,6 +4917,7 @@ cdef class Table(_Tabular):
-------
ChunkedArray
"""
self._assert_cpu()
return chunked_array([
batch.to_struct_array()
for batch in self.to_batches(max_chunksize=max_chunksize)
Expand Down Expand Up @@ -5033,6 +5041,7 @@ cdef class Table(_Tabular):
0 5 Brittle stars
1 100 Centipede
"""
self._assert_cpu()
cdef:
unique_ptr[TableBatchReader] reader
int64_t c_max_chunksize
Expand Down Expand Up @@ -5119,6 +5128,7 @@ cdef class Table(_Tabular):

def _to_pandas(self, options, categories=None, ignore_metadata=False,
types_mapper=None):
self._assert_cpu()
from pyarrow.pandas_compat import table_to_dataframe
df = table_to_dataframe(
options, self, categories,
Expand Down Expand Up @@ -5240,6 +5250,7 @@ cdef class Table(_Tabular):
>>> table.nbytes
72
"""
self._assert_cpu()
cdef:
CResult[int64_t] c_res_buffer

Expand Down Expand Up @@ -5269,6 +5280,7 @@ cdef class Table(_Tabular):
>>> table.get_total_buffer_size()
76
"""
self._assert_cpu()
cdef:
int64_t total_buffer_size

Expand Down Expand Up @@ -5577,6 +5589,7 @@ cdef class Table(_Tabular):
year: [[2020,2022,2021,2019]]
n_legs_sum: [[2,6,104,5]]
"""
self._assert_cpu()
return TableGroupBy(self, keys, use_threads=use_threads)

def join(self, right_table, keys, right_keys=None, join_type="left outer",
Expand Down Expand Up @@ -5686,6 +5699,7 @@ cdef class Table(_Tabular):
n_legs: [[100]]
animal: [["Centipede"]]
"""
self._assert_cpu()
if right_keys is None:
right_keys = keys
return _pac()._perform_join(
Expand Down Expand Up @@ -5773,6 +5787,7 @@ cdef class Table(_Tabular):
n_legs: [[null,5,null,5,null]]
animal: [[null,"Brittle stars",null,"Brittle stars",null]]
"""
self._assert_cpu()
if right_on is None:
right_on = on
if right_by is None:
Expand All @@ -5798,6 +5813,7 @@ cdef class Table(_Tabular):
-------
PyCapsule
"""
self._assert_cpu()
return self.to_reader().__arrow_c_stream__(requested_schema)

@property
Expand Down
203 changes: 188 additions & 15 deletions python/pyarrow/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -3538,7 +3538,7 @@ def test_chunked_array_non_cpu(cuda_context, cpu_chunked_array, cuda_chunked_arr

# filter() test
with pytest.raises(NotImplementedError):
cuda_chunked_array.filter([True, False, True, False, True])
cuda_chunked_array.filter([True])

# index() test
with pytest.raises(NotImplementedError):
Expand Down Expand Up @@ -3601,31 +3601,30 @@ def verify_cuda_recordbatch(batch, expected_schema):
def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
cuda_arrays, schema):
verify_cuda_recordbatch(cuda_recordbatch, expected_schema=schema)

# shape test
assert cuda_recordbatch.shape == (5, 2)

# columns() test
assert len(cuda_recordbatch.columns) == 2

# add_column(), set_column() test
for fn in [cuda_recordbatch.add_column, cuda_recordbatch.set_column]:
col = pa.array([6, 7, 8, 9, 10], pa.int8()).copy_to(cuda_context.memory_manager)
col = pa.array([1] * cuda_recordbatch.num_rows, pa.int8()).copy_to(cuda_context.memory_manager)
new_batch = fn(2, 'c2', col)
assert len(new_batch.columns) == 3
for c in new_batch.columns:
assert c.device_type == pa.DeviceAllocationType.CUDA
verify_cuda_recordbatch(new_batch, expected_schema=schema.append(pa.field('c2', pa.int8())))
err_msg = ("Got column on device <DeviceAllocationType.CPU: 1>, "
"but expected <DeviceAllocationType.CUDA: 2>.")
with pytest.raises(TypeError, match=err_msg):
fn(2, 'c2', [1, 1, 1, 1, 1])
fn(2, 'c2', [1] * cuda_recordbatch.num_rows)

# remove_column() test
new_batch = cuda_recordbatch.remove_column(1)
verify_cuda_recordbatch(new_batch, expected_schema=schema.remove(1))

# drop_columns() test
new_batch = cuda_recordbatch.drop_columns(['c0', 'c1'])
assert len(new_batch.columns) == 0
assert new_batch.device_type == pa.DeviceAllocationType.CUDA
new_batch = cuda_recordbatch.drop_columns(['c1'])
verify_cuda_recordbatch(new_batch, expected_schema=schema.remove(1))

# select() test
new_batch = cuda_recordbatch.select(['c0'])
Expand All @@ -3637,7 +3636,7 @@ def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
cuda_recordbatch.cast(new_schema)

# drop_null() test
null_col = pa.array([-2, -1, 0, 1, 2],
null_col = pa.array([1] * cuda_recordbatch.num_rows,
mask=[True, False, True, False, True]).copy_to(
cuda_context.memory_manager)
cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2', null_col)
Expand All @@ -3646,7 +3645,7 @@ def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,

# filter() test
with pytest.raises(NotImplementedError):
cuda_recordbatch.filter([True] * 5)
cuda_recordbatch.filter([True] * cuda_recordbatch.num_rows)

# take() test
with pytest.raises(NotImplementedError):
Expand Down Expand Up @@ -3754,7 +3753,181 @@ def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch,
from_dataframe(cuda_recordbatch.__dataframe__())


def test_table_non_cpu(cpu_table, cuda_table, cpu_and_cuda_table):
assert cpu_table.is_cpu
assert not cuda_table.is_cpu
assert not cpu_and_cuda_table.is_cpu
def verify_cuda_table(table, expected_schema):
table.validate()
assert table.is_cpu is False
assert table.num_columns == len(expected_schema.names)
assert table.column_names == expected_schema.names
assert str(table) in repr(table)
for c in table.columns:
assert c.is_cpu is False
for chunk in c.iterchunks():
assert chunk.is_cpu is False
assert chunk.device_type == pa.DeviceAllocationType.CUDA
assert table.schema == expected_schema


def test_table_non_cpu(cuda_context, cpu_table, cuda_table,
cuda_arrays, cuda_recordbatch, schema):
verify_cuda_table(cuda_table, expected_schema=schema)

# shape test
assert cuda_table.shape == (10, 2)

# columns() test
assert len(cuda_table.columns) == 2

# add_column(), set_column() test
for fn in [cuda_table.add_column, cuda_table.set_column]:
col = pa.array([1] * cuda_table.num_rows, pa.int8()).copy_to(cuda_context.memory_manager)
new_table = fn(2, 'c2', col)
verify_cuda_table(new_table, expected_schema=schema.append(pa.field('c2', pa.int8())))

# remove_column() test
new_table = cuda_table.remove_column(1)
verify_cuda_table(new_table, expected_schema=schema.remove(1))

# drop_columns() test
new_table = cuda_table.drop_columns(['c1'])
verify_cuda_table(new_table, expected_schema=schema.remove(1))

# select() test
new_table = cuda_table.select(['c0'])
verify_cuda_table(new_table, expected_schema=schema.remove(1))

# cast() test
new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1', pa.int64())])
with pytest.raises(NotImplementedError):
cuda_table.cast(new_schema)

# drop_null() test
null_col = pa.array([1] * cuda_table.num_rows,
mask=[True] * cuda_table.num_rows).copy_to(
cuda_context.memory_manager)
cuda_table_with_nulls = cuda_table.add_column(2, 'c2', null_col)
with pytest.raises(NotImplementedError):
cuda_table_with_nulls.drop_null()

# filter() test
with pytest.raises(NotImplementedError):
cuda_table.filter([True] * cuda_table.num_rows)

# take() test
with pytest.raises(NotImplementedError):
cuda_table.take([0])

# sort_by() test
with pytest.raises(NotImplementedError):
cuda_table.sort_by('c0')

# field() test
assert cuda_table.field(0) == schema.field(0)
assert cuda_table.field(1) == schema.field(1)

# equals() test
with pytest.raises(NotImplementedError):
assert cuda_table.equals(cpu_table)

# from_arrays() test
new_table = pa.Table.from_arrays(cuda_arrays, ['c0', 'c1'])
verify_cuda_table(new_table, expected_schema=schema)

# from_pydict() test
new_table = pa.Table.from_pydict({'c0': cuda_arrays[0], 'c1': cuda_arrays[1]})
verify_cuda_table(new_table, expected_schema=schema)

# from_struct_array() test
fields = [schema.field(i) for i in range(len(schema.names))]
struct_array = pa.StructArray.from_arrays(cuda_arrays, fields=fields)
with pytest.raises(NotImplementedError):
pa.Table.from_struct_array(struct_array)

# from_batches() test
new_table = pa.Table.from_batches([cuda_recordbatch, cuda_recordbatch], schema)
verify_cuda_table(new_table, expected_schema=schema)

# nbytes test
with pytest.raises(NotImplementedError):
assert cuda_table.nbytes

# get_total_buffer_size() test
with pytest.raises(NotImplementedError):
assert cuda_table.get_total_buffer_size()

# to_pydict() test
with pytest.raises(NotImplementedError):
cuda_table.to_pydict()

# to_pylist() test
with pytest.raises(NotImplementedError):
cuda_table.to_pylist()

# to_pandas() test
with pytest.raises(NotImplementedError):
cuda_table.to_pandas()

# to_struct_array() test
with pytest.raises(NotImplementedError):
cuda_table.to_struct_array()

# to_batches() test
with pytest.raises(NotImplementedError):
cuda_table.to_batches()

# slice() test
new_table = cuda_table.slice(1, 3)
verify_cuda_table(new_table, expected_schema=schema)
assert new_table.num_rows == 3

# replace_schema_metadata() test
new_table = cuda_table.replace_schema_metadata({b'key': b'value'})
verify_cuda_table(new_table, expected_schema=schema)
assert new_table.schema.metadata == {b'key': b'value'}

# rename_columns() test
new_table = cuda_table.rename_columns(['col0', 'col1'])
expected_schema = pa.schema(
[pa.field('col0', schema.field(0).type),
pa.field('col1', schema.field(1).type)])
verify_cuda_table(new_table, expected_schema=expected_schema)

# validate() test
cuda_table.validate()
with pytest.raises(NotImplementedError):
cuda_table.validate(full=True)

# flatten() test
with pytest.raises(NotImplementedError):
cuda_table.flatten()

# combine_chunks() test
with pytest.raises(NotImplementedError):
cuda_table.flatten()

# unify_dictionaries() test
with pytest.raises(NotImplementedError):
cuda_table.unify_dictionaries()

# group_by() test
with pytest.raises(NotImplementedError):
cuda_table.group_by('c0')

# join() test
with pytest.raises(NotImplementedError):
cuda_table.join(cuda_table, 'c0')

# join_asof() test
with pytest.raises(NotImplementedError):
cuda_table.join_asof(cuda_table, 'c0', 'c0', 0)

# __array__() test
with pytest.raises(NotImplementedError):
cuda_table.__array__()

# __arrow_c_stream__() test
with pytest.raises(NotImplementedError):
cuda_table.__arrow_c_stream__()

# __dataframe__() test
with pytest.raises(NotImplementedError):
from_dataframe(cuda_table.__dataframe__())

0 comments on commit 6cca473

Please sign in to comment.