File tree Expand file tree Collapse file tree 1 file changed +10
-2
lines changed
sql/core/src/main/scala/org/apache/spark/sql/execution Expand file tree Collapse file tree 1 file changed +10
-2
lines changed Original file line number Diff line number Diff line change @@ -127,14 +127,22 @@ case class PythonMapPartitions(
127127 reuseWorker
128128 ).compute(inputIterator, context.partitionId(), context)
129129
130+ val resultProj = UnsafeProjection .create(output, output)
131+
130132 if (outputIsPickled) {
131- outputIterator.map(bytes => InternalRow (bytes))
133+ val row = new GenericMutableRow (1 )
134+ outputIterator.map { bytes =>
135+ row(0 ) = bytes
136+ resultProj(row)
137+ }
132138 } else {
133139 val unpickle = new Unpickler
134140 outputIterator.flatMap { pickedResult =>
135141 val unpickledBatch = unpickle.loads(pickedResult)
136142 unpickledBatch.asInstanceOf [java.util.ArrayList [Any ]].asScala
137- }.map(result => EvaluatePython .fromJava(result, schema).asInstanceOf [InternalRow ])
143+ }.map { result =>
144+ resultProj(EvaluatePython .fromJava(result, schema).asInstanceOf [InternalRow ])
145+ }
138146 }
139147 }
140148 }
You can’t perform that action at this time.
0 commit comments