2121
2222from pyarrow.includes.libarrow cimport *
2323from pyarrow.includes.parquet cimport *
24- from pyarrow.includes.libarrow_io cimport ReadableFileInterface
24+ from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream, FileOutputStream
2525cimport pyarrow.includes.pyarrow as pyarrow
2626
2727from pyarrow.array cimport Array
@@ -151,15 +151,15 @@ def read_table(source, columns=None):
151151 return Table.from_arrays(columns, arrays)
152152
153153
154- def write_table (table , filename , chunk_size = None , version = None ,
154+ def write_table (table , sink , chunk_size = None , version = None ,
155155 use_dictionary = True , compression = None ):
156156 """
157157 Write a Table to Parquet format
158158
159159 Parameters
160160 ----------
161161 table : pyarrow.Table
162- filename : string
162+ sink : string or pyarrow.io.NativeFile
163163 chunk_size : int
164164 The maximum number of rows in each Parquet RowGroup. As a default,
165165 we will write a single RowGroup per file.
@@ -173,7 +173,8 @@ def write_table(table, filename, chunk_size=None, version=None,
173173 """
174174 cdef Table table_ = table
175175 cdef CTable* ctable_ = table_.table
176- cdef shared_ptr[ParquetOutputStream] sink
176+ cdef shared_ptr[ParquetWriteSink] sink_
177+ cdef shared_ptr[FileOutputStream] filesink_
177178 cdef WriterProperties.Builder properties_builder
178179 cdef int64_t chunk_size_ = 0
179180 if chunk_size is None :
@@ -230,7 +231,12 @@ def write_table(table, filename, chunk_size=None, version=None,
230231 else :
231232 raise ArrowException(" Unsupport compression codec" )
232233
233- sink.reset(new LocalFileOutputStream(tobytes(filename)))
234+ if isinstance (sink, six.string_types):
235+ check_status(FileOutputStream.Open(tobytes(sink), & filesink_))
236+ sink_.reset(new ParquetWriteSink(< shared_ptr[OutputStream]> filesink_))
237+ elif isinstance (sink, NativeFile):
238+ sink_.reset(new ParquetWriteSink((< NativeFile> sink).wr_file))
239+
234240 with nogil:
235- check_status(WriteFlatTable(ctable_, default_memory_pool(), sink ,
241+ check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_ ,
236242 chunk_size_, properties_builder.build()))
0 commit comments