diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd index d6966cdaadd..02265d0a68e 100644 --- a/python/pyarrow/io.pxd +++ b/python/pyarrow/io.pxd @@ -42,3 +42,6 @@ cdef class NativeFile: # suite of Arrow C++ libraries cdef read_handle(self, shared_ptr[ReadableFileInterface]* file) cdef write_handle(self, shared_ptr[OutputStream]* file) + +cdef get_reader(object source, shared_ptr[ReadableFileInterface]* reader) +cdef get_writer(object source, shared_ptr[OutputStream]* writer) diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 6b0e3924d20..8491aa8964f 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -256,6 +256,46 @@ def buffer_from_bytes(object obj): result.init(buf) return result +cdef get_reader(object source, shared_ptr[ReadableFileInterface]* reader): + cdef NativeFile nf + + if isinstance(source, bytes): + source = BytesReader(source) + elif not isinstance(source, NativeFile) and hasattr(source, 'read'): + # Optimistically hope this is file-like + source = PythonFileInterface(source, mode='r') + + if isinstance(source, NativeFile): + nf = source + + # TODO: what about read-write sources (e.g. memory maps) + if not nf.is_readonly: + raise IOError('Native file is not readable') + + nf.read_handle(reader) + else: + raise TypeError('Unable to read from object of type: {0}' + .format(type(source))) + + +cdef get_writer(object source, shared_ptr[OutputStream]* writer): + cdef NativeFile nf + + if not isinstance(source, NativeFile) and hasattr(source, 'write'): + # Optimistically hope this is file-like + source = PythonFileInterface(source, mode='w') + + if isinstance(source, NativeFile): + nf = source + + if nf.is_readonly: + raise IOError('Native file is not writeable') + + nf.write_handle(writer) + else: + raise TypeError('Unable to read from object of type: {0}' + .format(type(source))) + # ---------------------------------------------------------------------- # HDFS IO implementation diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx index 46deb5ad0c3..abc5e1b11ec 100644 --- a/python/pyarrow/ipc.pyx +++ b/python/pyarrow/ipc.pyx @@ -27,7 +27,7 @@ from pyarrow.includes.libarrow_ipc cimport * cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.error cimport check_status -from pyarrow.io cimport NativeFile +from pyarrow.io cimport NativeFile, get_reader, get_writer from pyarrow.schema cimport Schema from pyarrow.table cimport RecordBatch @@ -37,47 +37,6 @@ import pyarrow.io as io cimport cpython as cp -cdef get_reader(source, shared_ptr[ReadableFileInterface]* reader): - cdef NativeFile nf - - if isinstance(source, bytes): - source = io.BytesReader(source) - elif not isinstance(source, io.NativeFile) and hasattr(source, 'read'): - # Optimistically hope this is file-like - source = io.PythonFileInterface(source, mode='r') - - if isinstance(source, NativeFile): - nf = source - - # TODO: what about read-write sources (e.g. memory maps) - if not nf.is_readonly: - raise IOError('Native file is not readable') - - nf.read_handle(reader) - else: - raise TypeError('Unable to read from object of type: {0}' - .format(type(source))) - - -cdef get_writer(source, shared_ptr[OutputStream]* writer): - cdef NativeFile nf - - if not isinstance(source, io.NativeFile) and hasattr(source, 'write'): - # Optimistically hope this is file-like - source = io.PythonFileInterface(source, mode='w') - - if isinstance(source, io.NativeFile): - nf = source - - if nf.is_readonly: - raise IOError('Native file is not writeable') - - nf.write_handle(writer) - else: - raise TypeError('Unable to read from object of type: {0}' - .format(type(source))) - - cdef class ArrowFileWriter: cdef: shared_ptr[CFileWriter] writer diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index 83fddb287a3..043ccf12d91 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -31,7 +31,7 @@ from pyarrow.error cimport check_status from pyarrow.io import NativeFile from pyarrow.table cimport Table -from pyarrow.io cimport NativeFile +from pyarrow.io cimport NativeFile, get_reader, get_writer import six @@ -49,22 +49,27 @@ cdef class ParquetReader: def __cinit__(self): self.allocator.set_pool(default_memory_pool()) - cdef open_local_file(self, file_path): - cdef c_string path = tobytes(file_path) + def open(self, source): + self._open(source) - # Must be in one expression to avoid calling std::move which is not - # possible in Cython (due to missing rvalue support) + cdef _open(self, object source): + cdef: + shared_ptr[ReadableFileInterface] rd_handle + c_string path - # TODO(wesm): ParquetFileReader::OpenFIle can throw? - self.reader = unique_ptr[FileReader]( - new FileReader(default_memory_pool(), - ParquetFileReader.OpenFile(path))) + if isinstance(source, six.string_types): + path = tobytes(source) - cdef open_native_file(self, NativeFile file): - cdef shared_ptr[ReadableFileInterface] cpp_handle - file.read_handle(&cpp_handle) + # Must be in one expression to avoid calling std::move which is not + # possible in Cython (due to missing rvalue support) - check_status(OpenFile(cpp_handle, &self.allocator, &self.reader)) + # TODO(wesm): ParquetFileReader::OpenFile can throw? + self.reader = unique_ptr[FileReader]( + new FileReader(default_memory_pool(), + ParquetFileReader.OpenFile(path))) + else: + get_reader(source, &rd_handle) + check_status(OpenFile(rd_handle, &self.allocator, &self.reader)) def read_all(self): cdef: @@ -137,11 +142,7 @@ def read_table(source, columns=None): Content of the file as a table (of columns) """ cdef ParquetReader reader = ParquetReader() - - if isinstance(source, six.string_types): - reader.open_local_file(source) - elif isinstance(source, NativeFile): - reader.open_native_file(source) + reader._open(source) if columns is None: return reader.read_all() @@ -174,7 +175,10 @@ def write_table(table, sink, chunk_size=None, version=None, cdef Table table_ = table cdef CTable* ctable_ = table_.table cdef shared_ptr[ParquetWriteSink] sink_ + cdef shared_ptr[FileOutputStream] filesink_ + cdef shared_ptr[OutputStream] general_sink + cdef WriterProperties.Builder properties_builder cdef int64_t chunk_size_ = 0 if chunk_size is None: @@ -232,10 +236,11 @@ def write_table(table, sink, chunk_size=None, version=None, raise ArrowException("Unsupport compression codec") if isinstance(sink, six.string_types): - check_status(FileOutputStream.Open(tobytes(sink), &filesink_)) - sink_.reset(new ParquetWriteSink(filesink_)) - elif isinstance(sink, NativeFile): - sink_.reset(new ParquetWriteSink((sink).wr_file)) + check_status(FileOutputStream.Open(tobytes(sink), &filesink_)) + sink_.reset(new ParquetWriteSink(filesink_)) + else: + get_writer(sink, &general_sink) + sink_.reset(new ParquetWriteSink(general_sink)) with nogil: check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_, diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 841830f6fba..7c45732d345 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import io import pytest import pyarrow as A @@ -132,9 +133,8 @@ def test_pandas_column_selection(tmpdir): pdt.assert_frame_equal(df[['uint8']], df_read) -@parquet -def test_pandas_parquet_native_file_roundtrip(tmpdir): - size = 10000 + +def _test_dataframe(size=10000): np.random.seed(0) df = pd.DataFrame({ 'uint8': np.arange(size, dtype=np.uint8), @@ -149,6 +149,12 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir): 'float64': np.arange(size, dtype=np.float64), 'bool': np.random.randn(size) > 0 }) + return df + + +@parquet +def test_pandas_parquet_native_file_roundtrip(tmpdir): + df = _test_dataframe(10000) arrow_table = A.from_pandas_dataframe(df) imos = paio.InMemoryOutputStream() pq.write_table(arrow_table, imos, version="2.0") @@ -158,6 +164,30 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir): pdt.assert_frame_equal(df, df_read) +@parquet +def test_pandas_parquet_pyfile_roundtrip(tmpdir): + filename = tmpdir.join('pandas_pyfile_roundtrip.parquet').strpath + size = 5 + df = pd.DataFrame({ + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0, + 'strings': ['foo', 'bar', None, 'baz', 'qux'] + }) + + arrow_table = A.from_pandas_dataframe(df) + + with open(filename, 'wb') as f: + A.parquet.write_table(arrow_table, f, version="1.0") + + data = io.BytesIO(open(filename, 'rb').read()) + + table_read = pq.read_table(data) + df_read = table_read.to_pandas() + pdt.assert_frame_equal(df, df_read) + + @parquet def test_pandas_parquet_configuration_options(tmpdir): size = 10000