diff --git a/cpp/src/arrow/python/pandas_convert.cc b/cpp/src/arrow/python/pandas_convert.cc index 96dd09aa817..ac61cbc13c6 100644 --- a/cpp/src/arrow/python/pandas_convert.cc +++ b/cpp/src/arrow/python/pandas_convert.cc @@ -1102,7 +1102,7 @@ static inline PyObject* NewArray1DFromType( set_numpy_metadata(type, arrow_type, descr); return PyArray_NewFromDescr(&PyArray_Type, descr, 1, dims, nullptr, data, - NPY_ARRAY_OWNDATA | NPY_ARRAY_CARRAY, nullptr); + NPY_ARRAY_OWNDATA | NPY_ARRAY_CARRAY | NPY_ARRAY_WRITEABLE, nullptr); } class PandasBlock { diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index a61d746d652..6173299bca6 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -158,4 +158,5 @@ def deserialize_pandas(buf, nthreads=1): """ buffer_reader = pa.BufferReader(buf) reader = pa.RecordBatchFileReader(buffer_reader) - return reader.read_all().to_pandas(nthreads=nthreads) + table = reader.read_all() + return table.to_pandas(nthreads=nthreads) diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py index 255b31a6751..9711b722c2f 100644 --- a/python/pyarrow/pandas_compat.py +++ b/python/pyarrow/pandas_compat.py @@ -102,3 +102,76 @@ def construct_metadata(df, index_levels, preserve_index): } ).encode('utf8') } + + +def table_to_blockmanager(table, nthreads=1): + import pandas.core.internals as _int + from pyarrow.compat import DatetimeTZDtype + import pyarrow.lib as lib + + block_table = table + + index_columns = [] + index_arrays = [] + index_names = [] + schema = table.schema + row_count = table.num_rows + metadata = schema.metadata + + if metadata is not None and b'pandas' in metadata: + pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8')) + index_columns = pandas_metadata['index_columns'] + + for name in index_columns: + i = schema.get_field_index(name) + if i != -1: + col = table.column(i) + index_name = (None if is_unnamed_index_level(name) + else name) + values = col.to_pandas().values + if not values.flags.writeable: + # ARROW-1054: in pandas 0.19.2, factorize will reject + # non-writeable arrays when calling MultiIndex.from_arrays + values = values.copy() + + index_arrays.append(values) + index_names.append(index_name) + block_table = block_table.remove_column( + block_table.schema.get_field_index(name) + ) + + result = lib.table_to_blocks(block_table, nthreads) + + blocks = [] + for item in result: + block_arr = item['block'] + placement = item['placement'] + if 'dictionary' in item: + cat = pd.Categorical(block_arr, + categories=item['dictionary'], + ordered=False, fastpath=True) + block = _int.make_block(cat, placement=placement, + klass=_int.CategoricalBlock, + fastpath=True) + elif 'timezone' in item: + dtype = DatetimeTZDtype('ns', tz=item['timezone']) + block = _int.make_block(block_arr, placement=placement, + klass=_int.DatetimeTZBlock, + dtype=dtype, fastpath=True) + else: + block = _int.make_block(block_arr, placement=placement) + blocks.append(block) + + if len(index_arrays) > 1: + index = pd.MultiIndex.from_arrays(index_arrays, names=index_names) + elif len(index_arrays) == 1: + index = pd.Index(index_arrays[0], name=index_names[0]) + else: + index = pd.RangeIndex(row_count) + + axes = [ + [column.name for column in block_table.itercolumns()], + index + ] + + return _int.BlockManager(blocks, axes) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index f59a719b29e..dc26daba905 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -15,7 +15,6 @@ # specific language governing permissions and limitations # under the License. -import itertools import json import six diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 25a4f84cfc8..3f67ba40c73 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -559,86 +559,20 @@ cdef class RecordBatch: return pyarrow_wrap_batch(batch) -cdef table_to_blockmanager(const shared_ptr[CTable]& ctable, int nthreads): - import pandas.core.internals as _int - from pandas import RangeIndex, Categorical - from pyarrow.compat import DatetimeTZDtype - - cdef: - Table table = pyarrow_wrap_table(ctable) - Table block_table = pyarrow_wrap_table(ctable) - Schema schema = table.schema - - size_t row_count = table.num_rows - size_t total_columns = table.num_columns - - dict metadata = schema.metadata - dict pandas_metadata = None - - list index_columns = [] - list index_arrays = [] - - if metadata is not None and b'pandas' in metadata: - pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8')) - index_columns = pandas_metadata['index_columns'] - - cdef: - Column col - int64_t i - - for name in index_columns: - i = schema.get_field_index(name) - if i != -1: - col = table.column(i) - index_name = None if pdcompat.is_unnamed_index_level(name) else name - index_arrays.append( - pd.Index(col.to_pandas().values, name=index_name) - ) - block_table = block_table.remove_column( - block_table.schema.get_field_index(name) - ) - +def table_to_blocks(Table table, int nthreads): cdef: PyObject* result_obj - shared_ptr[CTable] c_block_table = block_table.sp_table + shared_ptr[CTable] c_table = table.sp_table with nogil: check_status( libarrow.ConvertTableToPandas( - c_block_table, nthreads, &result_obj + c_table, nthreads, &result_obj ) ) - result = PyObject_to_object(result_obj) - - blocks = [] - for item in result: - block_arr = item['block'] - placement = item['placement'] - if 'dictionary' in item: - cat = Categorical(block_arr, - categories=item['dictionary'], - ordered=False, fastpath=True) - block = _int.make_block(cat, placement=placement, - klass=_int.CategoricalBlock, - fastpath=True) - elif 'timezone' in item: - dtype = DatetimeTZDtype('ns', tz=item['timezone']) - block = _int.make_block(block_arr, placement=placement, - klass=_int.DatetimeTZBlock, - dtype=dtype, fastpath=True) - else: - block = _int.make_block(block_arr, placement=placement) - blocks.append(block) - - cdef list axes = [ - [column.name for column in block_table.itercolumns()], - pd.MultiIndex.from_arrays( - index_arrays - ) if index_arrays else pd.RangeIndex(row_count), - ] + return PyObject_to_object(result_obj) - return _int.BlockManager(blocks, axes) cdef class Table: @@ -829,7 +763,7 @@ cdef class Table: if nthreads is None: nthreads = cpu_count() - mgr = table_to_blockmanager(self.sp_table, nthreads) + mgr = pdcompat.table_to_blockmanager(self, nthreads) return pd.DataFrame(mgr) def to_pydict(self):