Skip to content

Commit 3919a27

Browse files
committed
ARROW-332: Add RecordBatch.to_pandas method
This makes testing and IPC data wrangling a little easier. Author: Wes McKinney <wes.mckinney@twosigma.com> Closes #165 from wesm/ARROW-332 and squashes the following commits: 5f19b97 [Wes McKinney] Add simple arrow::Array->NumPy-for-pandas conversion helper and RecordBatch.to_pandas
1 parent caa843b commit 3919a27

File tree

9 files changed

+133
-24
lines changed

9 files changed

+133
-24
lines changed

python/pyarrow/includes/pyarrow.pxd

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,11 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil:
5050
PyStatus PandasMaskedToArrow(MemoryPool* pool, object ao, object mo,
5151
shared_ptr[CArray]* out)
5252

53-
PyStatus ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref,
54-
PyObject** out)
53+
PyStatus ConvertArrayToPandas(const shared_ptr[CArray]& arr,
54+
object py_ref, PyObject** out)
55+
56+
PyStatus ConvertColumnToPandas(const shared_ptr[CColumn]& arr,
57+
object py_ref, PyObject** out)
5558

5659
MemoryPool* get_memory_pool()
5760

python/pyarrow/io.pyx

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,18 @@ cdef class InMemoryOutputStream(NativeFile):
230230
return result
231231

232232

233+
cdef class BufferReader(NativeFile):
234+
cdef:
235+
Buffer buffer
236+
237+
def __cinit__(self, Buffer buffer):
238+
self.buffer = buffer
239+
self.rd_file.reset(new CBufferReader(buffer.buffer.get().data(),
240+
buffer.buffer.get().size()))
241+
self.is_readonly = 1
242+
self.is_open = True
243+
244+
233245
def buffer_from_bytes(object obj):
234246
"""
235247
Construct an Arrow buffer from a Python bytes object

python/pyarrow/table.pyx

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ cdef class Column:
100100

101101
import pandas as pd
102102

103-
check_status(pyarrow.ArrowToPandas(self.sp_column, self, &arr))
103+
check_status(pyarrow.ConvertColumnToPandas(self.sp_column, self, &arr))
104104
return pd.Series(<object>arr, name=self.name)
105105

106106
cdef _check_nullptr(self):
@@ -233,6 +233,27 @@ cdef class RecordBatch:
233233

234234
return self.batch.Equals(deref(other.batch))
235235

236+
def to_pandas(self):
237+
"""
238+
Convert the arrow::RecordBatch to a pandas DataFrame
239+
"""
240+
cdef:
241+
PyObject* np_arr
242+
shared_ptr[CArray] arr
243+
Column column
244+
245+
import pandas as pd
246+
247+
names = []
248+
data = []
249+
for i in range(self.batch.num_columns()):
250+
arr = self.batch.column(i)
251+
check_status(pyarrow.ConvertArrayToPandas(arr, self, &np_arr))
252+
names.append(frombytes(self.batch.column_name(i)))
253+
data.append(<object> np_arr)
254+
255+
return pd.DataFrame(dict(zip(names, data)), columns=names)
256+
236257
@classmethod
237258
def from_pandas(cls, df):
238259
"""
@@ -354,7 +375,7 @@ cdef class Table:
354375
for i in range(self.table.num_columns()):
355376
col = self.table.column(i)
356377
column = self.column(i)
357-
check_status(pyarrow.ArrowToPandas(col, column, &arr))
378+
check_status(pyarrow.ConvertColumnToPandas(col, column, &arr))
358379
names.append(frombytes(col.get().name()))
359380
data.append(<object> arr)
360381

python/pyarrow/tests/test_ipc.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io
1919

2020
import numpy as np
21+
22+
from pandas.util.testing import assert_frame_equal
2123
import pandas as pd
2224

2325
import pyarrow as A
@@ -85,17 +87,40 @@ def test_ipc_file_simple_roundtrip():
8587
helper.run()
8688

8789

90+
def test_ipc_zero_copy_numpy():
91+
df = pd.DataFrame({'foo': [1.5]})
92+
93+
batch = A.RecordBatch.from_pandas(df)
94+
sink = arrow_io.InMemoryOutputStream()
95+
write_file(batch, sink)
96+
buffer = sink.get_result()
97+
reader = arrow_io.BufferReader(buffer)
98+
99+
batches = read_file(reader)
100+
101+
data = batches[0].to_pandas()
102+
rdf = pd.DataFrame(data)
103+
assert_frame_equal(df, rdf)
104+
105+
88106
# XXX: For benchmarking
89107

90108
def big_batch():
109+
K = 2**4
110+
N = 2**20
91111
df = pd.DataFrame(
92-
np.random.randn(2**4, 2**20).T,
93-
columns=[str(i) for i in range(2**4)]
112+
np.random.randn(K, N).T,
113+
columns=[str(i) for i in range(K)]
94114
)
95115

96116
df = pd.concat([df] * 2 ** 3, ignore_index=True)
117+
return df
118+
97119

98-
return A.RecordBatch.from_pandas(df)
120+
def write_to_memory2(batch):
121+
sink = arrow_io.InMemoryOutputStream()
122+
write_file(batch, sink)
123+
return sink.get_result()
99124

100125

101126
def write_to_memory(batch):
@@ -114,3 +139,12 @@ def read_file(source):
114139
reader = ipc.ArrowFileReader(source)
115140
return [reader.get_record_batch(i)
116141
for i in range(reader.num_record_batches)]
142+
143+
# df = big_batch()
144+
# batch = A.RecordBatch.from_pandas(df)
145+
# mem = write_to_memory(batch)
146+
# batches = read_file(mem)
147+
# data = batches[0].to_pandas()
148+
# rdf = pd.DataFrame(data)
149+
150+
# [x.to_pandas() for x in batches]

python/pyarrow/tests/test_table.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,47 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
import pyarrow as A
18+
import numpy as np
19+
20+
from pandas.util.testing import assert_frame_equal
21+
import pandas as pd
22+
23+
import pyarrow as pa
1924

2025

2126
def test_recordbatch_basics():
2227
data = [
23-
A.from_pylist(range(5)),
24-
A.from_pylist([-10, -5, 0, 5, 10])
28+
pa.from_pylist(range(5)),
29+
pa.from_pylist([-10, -5, 0, 5, 10])
2530
]
2631

27-
batch = A.RecordBatch.from_arrays(['c0', 'c1'], data)
32+
batch = pa.RecordBatch.from_arrays(['c0', 'c1'], data)
2833

2934
assert len(batch) == 5
3035
assert batch.num_rows == 5
3136
assert batch.num_columns == len(data)
3237

3338

39+
def test_recordbatch_from_to_pandas():
40+
data = pd.DataFrame({
41+
'c1': np.array([1, 2, 3, 4, 5], dtype='int64'),
42+
'c2': np.array([1, 2, 3, 4, 5], dtype='uint32'),
43+
'c2': np.random.randn(5),
44+
'c3': ['foo', 'bar', None, 'baz', 'qux'],
45+
'c4': [False, True, False, True, False]
46+
})
47+
48+
batch = pa.RecordBatch.from_pandas(data)
49+
result = batch.to_pandas()
50+
assert_frame_equal(data, result)
51+
52+
3453
def test_table_basics():
3554
data = [
36-
A.from_pylist(range(5)),
37-
A.from_pylist([-10, -5, 0, 5, 10])
55+
pa.from_pylist(range(5)),
56+
pa.from_pylist([-10, -5, 0, 5, 10])
3857
]
39-
table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
58+
table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')
4059
assert table.name == 'table_name'
4160
assert len(table) == 5
4261
assert table.num_rows == 5
@@ -50,15 +69,15 @@ def test_table_basics():
5069

5170
def test_table_pandas():
5271
data = [
53-
A.from_pylist(range(5)),
54-
A.from_pylist([-10, -5, 0, 5, 10])
72+
pa.from_pylist(range(5)),
73+
pa.from_pylist([-10, -5, 0, 5, 10])
5574
]
56-
table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
75+
table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')
5776

5877
# TODO: Use this part once from_pandas is implemented
5978
# data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]}
6079
# df = pd.DataFrame(data)
61-
# A.Table.from_pandas(df)
80+
# pa.Table.from_pandas(df)
6281

6382
df = table.to_pandas()
6483
assert set(df.columns) == set(('a', 'b'))

python/src/pyarrow/adapters/pandas.cc

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
#include "pyarrow/numpy_interop.h"
2323

24+
#include "pyarrow/adapters/pandas.h"
25+
2426
#include <cmath>
2527
#include <cstdint>
2628
#include <memory>
@@ -38,6 +40,7 @@ namespace pyarrow {
3840

3941
using arrow::Array;
4042
using arrow::Column;
43+
using arrow::Field;
4144
using arrow::DataType;
4245
namespace util = arrow::util;
4346

@@ -106,7 +109,7 @@ struct npy_traits<NPY_FLOAT64> {
106109

107110
template <>
108111
struct npy_traits<NPY_DATETIME> {
109-
typedef double value_type;
112+
typedef int64_t value_type;
110113
using TypeClass = arrow::TimestampType;
111114

112115
static constexpr bool supports_nulls = true;
@@ -163,6 +166,8 @@ class ArrowSerializer {
163166
Status ConvertData();
164167

165168
Status ConvertObjectStrings(std::shared_ptr<Array>* out) {
169+
PyAcquireGIL lock;
170+
166171
PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
167172
arrow::TypePtr string_type(new arrow::StringType());
168173
arrow::StringBuilder string_builder(pool_, string_type);
@@ -197,6 +202,8 @@ class ArrowSerializer {
197202
}
198203

199204
Status ConvertBooleans(std::shared_ptr<Array>* out) {
205+
PyAcquireGIL lock;
206+
200207
PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
201208

202209
int nbytes = util::bytes_for_bits(length_);
@@ -798,7 +805,15 @@ class ArrowDeserializer {
798805
} \
799806
break;
800807

801-
Status ArrowToPandas(const std::shared_ptr<Column>& col, PyObject* py_ref,
808+
Status ConvertArrayToPandas(const std::shared_ptr<Array>& arr, PyObject* py_ref,
809+
PyObject** out) {
810+
static std::string dummy_name = "dummy";
811+
auto field = std::make_shared<Field>(dummy_name, arr->type());
812+
auto col = std::make_shared<Column>(field, arr);
813+
return ConvertColumnToPandas(col, py_ref, out);
814+
}
815+
816+
Status ConvertColumnToPandas(const std::shared_ptr<Column>& col, PyObject* py_ref,
802817
PyObject** out) {
803818
switch(col->type()->type) {
804819
FROM_ARROW_CASE(BOOL);

python/src/pyarrow/adapters/pandas.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ namespace arrow {
3131

3232
class Array;
3333
class Column;
34+
class MemoryPool;
3435

3536
} // namespace arrow
3637

@@ -39,7 +40,11 @@ namespace pyarrow {
3940
class Status;
4041

4142
PYARROW_EXPORT
42-
Status ArrowToPandas(const std::shared_ptr<arrow::Column>& col, PyObject* py_ref,
43+
Status ConvertArrayToPandas(const std::shared_ptr<arrow::Array>& arr, PyObject* py_ref,
44+
PyObject** out);
45+
46+
PYARROW_EXPORT
47+
Status ConvertColumnToPandas(const std::shared_ptr<arrow::Column>& col, PyObject* py_ref,
4348
PyObject** out);
4449

4550
PYARROW_EXPORT

python/src/pyarrow/common.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer {
120120
Py_INCREF(arr);
121121

122122
data_ = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_));
123-
size_ = PyArray_SIZE(arr_);
124-
capacity_ = size_ * PyArray_DESCR(arr_)->elsize;
123+
size_ = PyArray_SIZE(arr_) * PyArray_DESCR(arr_)->elsize;
124+
capacity_ = size_;
125125
}
126126

127127
virtual ~NumPyBuffer() {

python/src/pyarrow/io.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ arrow::Status PythonFile::Write(const uint8_t* data, int64_t nbytes) {
8585
ARROW_RETURN_NOT_OK(CheckPyError());
8686

8787
PyObject* result = PyObject_CallMethod(file_, "write", "(O)", py_data);
88-
Py_DECREF(py_data);
88+
Py_XDECREF(py_data);
8989
Py_XDECREF(result);
9090
ARROW_RETURN_NOT_OK(CheckPyError());
9191
return arrow::Status::OK();

0 commit comments

Comments
 (0)