diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd index 2fa5a7d6325..7c47f21854e 100644 --- a/python/pyarrow/includes/pyarrow.pxd +++ b/python/pyarrow/includes/pyarrow.pxd @@ -50,8 +50,11 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil: PyStatus PandasMaskedToArrow(MemoryPool* pool, object ao, object mo, shared_ptr[CArray]* out) - PyStatus ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref, - PyObject** out) + PyStatus ConvertArrayToPandas(const shared_ptr[CArray]& arr, + object py_ref, PyObject** out) + + PyStatus ConvertColumnToPandas(const shared_ptr[CColumn]& arr, + object py_ref, PyObject** out) MemoryPool* get_memory_pool() diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 00a492fc0ba..8970e06effd 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -230,6 +230,18 @@ cdef class InMemoryOutputStream(NativeFile): return result +cdef class BufferReader(NativeFile): + cdef: + Buffer buffer + + def __cinit__(self, Buffer buffer): + self.buffer = buffer + self.rd_file.reset(new CBufferReader(buffer.buffer.get().data(), + buffer.buffer.get().size())) + self.is_readonly = 1 + self.is_open = True + + def buffer_from_bytes(object obj): """ Construct an Arrow buffer from a Python bytes object diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index a1cadcd1e0f..969571262ca 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -100,7 +100,7 @@ cdef class Column: import pandas as pd - check_status(pyarrow.ArrowToPandas(self.sp_column, self, &arr)) + check_status(pyarrow.ConvertColumnToPandas(self.sp_column, self, &arr)) return pd.Series(arr, name=self.name) cdef _check_nullptr(self): @@ -233,6 +233,27 @@ cdef class RecordBatch: return self.batch.Equals(deref(other.batch)) + def to_pandas(self): + """ + Convert the arrow::RecordBatch to a pandas DataFrame + """ + cdef: + PyObject* np_arr + shared_ptr[CArray] arr + Column column + + import pandas as pd + + names = [] + data = [] + for i in range(self.batch.num_columns()): + arr = self.batch.column(i) + check_status(pyarrow.ConvertArrayToPandas(arr, self, &np_arr)) + names.append(frombytes(self.batch.column_name(i))) + data.append( np_arr) + + return pd.DataFrame(dict(zip(names, data)), columns=names) + @classmethod def from_pandas(cls, df): """ @@ -354,7 +375,7 @@ cdef class Table: for i in range(self.table.num_columns()): col = self.table.column(i) column = self.column(i) - check_status(pyarrow.ArrowToPandas(col, column, &arr)) + check_status(pyarrow.ConvertColumnToPandas(col, column, &arr)) names.append(frombytes(col.get().name())) data.append( arr) diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index b9e9e6ed0c4..14cbb30d5d4 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -18,6 +18,8 @@ import io import numpy as np + +from pandas.util.testing import assert_frame_equal import pandas as pd import pyarrow as A @@ -85,17 +87,40 @@ def test_ipc_file_simple_roundtrip(): helper.run() +def test_ipc_zero_copy_numpy(): + df = pd.DataFrame({'foo': [1.5]}) + + batch = A.RecordBatch.from_pandas(df) + sink = arrow_io.InMemoryOutputStream() + write_file(batch, sink) + buffer = sink.get_result() + reader = arrow_io.BufferReader(buffer) + + batches = read_file(reader) + + data = batches[0].to_pandas() + rdf = pd.DataFrame(data) + assert_frame_equal(df, rdf) + + # XXX: For benchmarking def big_batch(): + K = 2**4 + N = 2**20 df = pd.DataFrame( - np.random.randn(2**4, 2**20).T, - columns=[str(i) for i in range(2**4)] + np.random.randn(K, N).T, + columns=[str(i) for i in range(K)] ) df = pd.concat([df] * 2 ** 3, ignore_index=True) + return df + - return A.RecordBatch.from_pandas(df) +def write_to_memory2(batch): + sink = arrow_io.InMemoryOutputStream() + write_file(batch, sink) + return sink.get_result() def write_to_memory(batch): @@ -114,3 +139,12 @@ def read_file(source): reader = ipc.ArrowFileReader(source) return [reader.get_record_batch(i) for i in range(reader.num_record_batches)] + +# df = big_batch() +# batch = A.RecordBatch.from_pandas(df) +# mem = write_to_memory(batch) +# batches = read_file(mem) +# data = batches[0].to_pandas() +# rdf = pd.DataFrame(data) + +# [x.to_pandas() for x in batches] diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index c5130329e02..4c9d302106a 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -15,28 +15,47 @@ # specific language governing permissions and limitations # under the License. -import pyarrow as A +import numpy as np + +from pandas.util.testing import assert_frame_equal +import pandas as pd + +import pyarrow as pa def test_recordbatch_basics(): data = [ - A.from_pylist(range(5)), - A.from_pylist([-10, -5, 0, 5, 10]) + pa.from_pylist(range(5)), + pa.from_pylist([-10, -5, 0, 5, 10]) ] - batch = A.RecordBatch.from_arrays(['c0', 'c1'], data) + batch = pa.RecordBatch.from_arrays(['c0', 'c1'], data) assert len(batch) == 5 assert batch.num_rows == 5 assert batch.num_columns == len(data) +def test_recordbatch_from_to_pandas(): + data = pd.DataFrame({ + 'c1': np.array([1, 2, 3, 4, 5], dtype='int64'), + 'c2': np.array([1, 2, 3, 4, 5], dtype='uint32'), + 'c2': np.random.randn(5), + 'c3': ['foo', 'bar', None, 'baz', 'qux'], + 'c4': [False, True, False, True, False] + }) + + batch = pa.RecordBatch.from_pandas(data) + result = batch.to_pandas() + assert_frame_equal(data, result) + + def test_table_basics(): data = [ - A.from_pylist(range(5)), - A.from_pylist([-10, -5, 0, 5, 10]) + pa.from_pylist(range(5)), + pa.from_pylist([-10, -5, 0, 5, 10]) ] - table = A.Table.from_arrays(('a', 'b'), data, 'table_name') + table = pa.Table.from_arrays(('a', 'b'), data, 'table_name') assert table.name == 'table_name' assert len(table) == 5 assert table.num_rows == 5 @@ -50,15 +69,15 @@ def test_table_basics(): def test_table_pandas(): data = [ - A.from_pylist(range(5)), - A.from_pylist([-10, -5, 0, 5, 10]) + pa.from_pylist(range(5)), + pa.from_pylist([-10, -5, 0, 5, 10]) ] - table = A.Table.from_arrays(('a', 'b'), data, 'table_name') + table = pa.Table.from_arrays(('a', 'b'), data, 'table_name') # TODO: Use this part once from_pandas is implemented # data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]} # df = pd.DataFrame(data) - # A.Table.from_pandas(df) + # pa.Table.from_pandas(df) df = table.to_pandas() assert set(df.columns) == set(('a', 'b')) diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index ae24b7ee584..b2fcd37aec9 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -21,6 +21,8 @@ #include "pyarrow/numpy_interop.h" +#include "pyarrow/adapters/pandas.h" + #include #include #include @@ -38,6 +40,7 @@ namespace pyarrow { using arrow::Array; using arrow::Column; +using arrow::Field; using arrow::DataType; namespace util = arrow::util; @@ -106,7 +109,7 @@ struct npy_traits { template <> struct npy_traits { - typedef double value_type; + typedef int64_t value_type; using TypeClass = arrow::TimestampType; static constexpr bool supports_nulls = true; @@ -163,6 +166,8 @@ class ArrowSerializer { Status ConvertData(); Status ConvertObjectStrings(std::shared_ptr* out) { + PyAcquireGIL lock; + PyObject** objects = reinterpret_cast(PyArray_DATA(arr_)); arrow::TypePtr string_type(new arrow::StringType()); arrow::StringBuilder string_builder(pool_, string_type); @@ -197,6 +202,8 @@ class ArrowSerializer { } Status ConvertBooleans(std::shared_ptr* out) { + PyAcquireGIL lock; + PyObject** objects = reinterpret_cast(PyArray_DATA(arr_)); int nbytes = util::bytes_for_bits(length_); @@ -798,7 +805,15 @@ class ArrowDeserializer { } \ break; -Status ArrowToPandas(const std::shared_ptr& col, PyObject* py_ref, +Status ConvertArrayToPandas(const std::shared_ptr& arr, PyObject* py_ref, + PyObject** out) { + static std::string dummy_name = "dummy"; + auto field = std::make_shared(dummy_name, arr->type()); + auto col = std::make_shared(field, arr); + return ConvertColumnToPandas(col, py_ref, out); +} + +Status ConvertColumnToPandas(const std::shared_ptr& col, PyObject* py_ref, PyObject** out) { switch(col->type()->type) { FROM_ARROW_CASE(BOOL); diff --git a/python/src/pyarrow/adapters/pandas.h b/python/src/pyarrow/adapters/pandas.h index c3377685bcc..141d1219e64 100644 --- a/python/src/pyarrow/adapters/pandas.h +++ b/python/src/pyarrow/adapters/pandas.h @@ -31,6 +31,7 @@ namespace arrow { class Array; class Column; +class MemoryPool; } // namespace arrow @@ -39,7 +40,11 @@ namespace pyarrow { class Status; PYARROW_EXPORT -Status ArrowToPandas(const std::shared_ptr& col, PyObject* py_ref, +Status ConvertArrayToPandas(const std::shared_ptr& arr, PyObject* py_ref, + PyObject** out); + +PYARROW_EXPORT +Status ConvertColumnToPandas(const std::shared_ptr& col, PyObject* py_ref, PyObject** out); PYARROW_EXPORT diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h index 96eed1654a7..50c2577b93c 100644 --- a/python/src/pyarrow/common.h +++ b/python/src/pyarrow/common.h @@ -120,8 +120,8 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer { Py_INCREF(arr); data_ = reinterpret_cast(PyArray_DATA(arr_)); - size_ = PyArray_SIZE(arr_); - capacity_ = size_ * PyArray_DESCR(arr_)->elsize; + size_ = PyArray_SIZE(arr_) * PyArray_DESCR(arr_)->elsize; + capacity_ = size_; } virtual ~NumPyBuffer() { diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc index 9879b3474bc..7bf32ffa8d2 100644 --- a/python/src/pyarrow/io.cc +++ b/python/src/pyarrow/io.cc @@ -85,7 +85,7 @@ arrow::Status PythonFile::Write(const uint8_t* data, int64_t nbytes) { ARROW_RETURN_NOT_OK(CheckPyError()); PyObject* result = PyObject_CallMethod(file_, "write", "(O)", py_data); - Py_DECREF(py_data); + Py_XDECREF(py_data); Py_XDECREF(result); ARROW_RETURN_NOT_OK(CheckPyError()); return arrow::Status::OK();