diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index d5cf1e713c6f..c3f1a11697d2 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -566,8 +566,8 @@ def load_stream(self, stream): import pyarrow as pa reader = pa.open_stream(stream) for batch in reader: - for row_index in xrange(0, batch.num_rows): - yield [batch[col_index][row_index].as_py() for col_index in xrange(0, batch.num_columns)] + for row in (zip(*[batch[col].to_pylist() for col in range(batch.num_columns)])): + yield row def dump_stream(self, iterator, stream): import pyarrow as pa @@ -575,14 +575,15 @@ def dump_stream(self, iterator, stream): writer = pa.RecordBatchStreamWriter(stream, self.schema) row_id = 0 # todo: we should append data to arrow vector directly, but I can't find the API... - column_chunks = [[] for col_index in xrange(0, len(self.schema))] + column_chunks = [[] for col_index in xrange(len(self.schema))] + appends = [column_chunks[i].append for i in xrange(len(self.schema))] for row in iterator: if row_id < 10000: if len(self.schema) == 1: - column_chunks[0].append(row) + appends[0](row) else: - for col_index in xrange(0, len(self.schema)): - column_chunks[col_index].append(row[col_index]) + for col_index in xrange(len(self.schema)): + appends[col_index](row[col_index]) row_id += 1 else: self._write(column_chunks, writer) @@ -594,7 +595,7 @@ def dump_stream(self, iterator, stream): def _write(self, column_chunks, writer): import pyarrow as pa vectors = [] - for col_index in xrange(0, len(self.schema)): + for col_index in xrange(len(self.schema)): # todo: can we reuse the vector? vector = pa.array(column_chunks[col_index], self.schema[col_index].type) vectors.append(vector)