diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 67f671926561..79f7c7603ba6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -131,8 +131,9 @@ class CodegenContext { def declareMutableStates(): String = { // It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in // `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones. - mutableStates.distinct.map { case (javaType, variableName, _) => + mutableStates.distinct.map { case (javaType, variableName, _) if variableName != "" => s"private $javaType $variableName;" + case _ => "" }.mkString("\n") } @@ -188,6 +189,14 @@ class CodegenContext { /** The variable name of the input row in generated code. */ final var INPUT_ROW = "i" + var isRow = true + var enableColumnCodeGen = false + var iteratorInput = "" + var isRowWrite = true + var generateColumnWrite = false + var rowWriteIdx = "" + var columnarBatch = "" + /** * The map from a variable name to it's next ID. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 6aa9cbf08bdb..169911641c08 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -72,12 +72,64 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro inputTypes: Seq[DataType], bufferHolder: String, isTopLevel: Boolean = false): String = { + var colOutVars: Seq[String] = Seq.empty val rowWriterClass = classOf[UnsafeRowWriter].getName val rowWriter = ctx.freshName("rowWriter") - ctx.addMutableState(rowWriterClass, rowWriter, - s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});") + if (!ctx.generateColumnWrite) { + ctx.addMutableState(rowWriterClass, rowWriter, + s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});") + } else if (isTopLevel) { + val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" + val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" + + ctx.columnarBatch = ctx.freshName("columnarBatch") + ctx.addMutableState(s"$columnarBatchClz", ctx.columnarBatch, "") + + val metadataEmpty = "org.apache.spark.sql.types.Metadata.empty()" + val columnarBatchAllocate = inputs.zip(inputTypes).zipWithIndex.map { + case ((input, dataType), index) => + val dt = dataType match { + case udt: UserDefinedType[_] => udt.sqlType + case other => other + } + val dtClsName = dt match { + case FloatType => "org.apache.spark.sql.types.DataTypes.FloatType" + case DoubleType => "org.apache.spark.sql.types.DataTypes.DoubleType" + case _ => throw new UnsupportedOperationException() + } + s""" + new org.apache.spark.sql.types.StructField( + "col$index", $dtClsName, ${(input.isNull != "false")}, $metadataEmpty) + """.stripMargin + (if (inputs.length - 1 != index) "," else "") + } - val resetWriter = if (isTopLevel) { + colOutVars = inputs.indices.map(i => ctx.freshName("colOutInstance" + i)) + val columnOutAssigns = colOutVars.zipWithIndex.map { case (name, i) => + ctx.addMutableState(columnVectorClz, name, "") + s"$name = ${ctx.columnarBatch}.column($i);" + } + + val batchSchema = ctx.freshName("batchSchema") + val allocateCS = ctx.freshName("allocateColumnarStorage") + ctx.addNewFunction(allocateCS, + s""" + |void $allocateCS() { + |org.apache.spark.sql.types.StructType $batchSchema = + | new org.apache.spark.sql.types.StructType( + | new org.apache.spark.sql.types.StructField[] { + | ${columnarBatchAllocate.mkString("\n")} + |}); + | + |${ctx.columnarBatch} = ${columnarBatchClz}.allocate( + | $batchSchema, org.apache.spark.memory.MemoryMode.ON_HEAP); + |registerColumnarBatch(${ctx.columnarBatch}); + |${columnOutAssigns.mkString("", "\n", "\n")} + |} + """.stripMargin) + ctx.addMutableState("", "", s"$allocateCS();"); + } + + val resetWriter = if (ctx.generateColumnWrite) "" else if (isTopLevel) { // For top level row writer, it always writes to the beginning of the global buffer holder, // which means its fixed-size region always in the same position, so we don't need to call // `reset` to set up its fixed-size region every time. @@ -100,14 +152,21 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro } val tmpCursor = ctx.freshName("tmpCursor") - val setNull = dt match { + val setNull = if (ctx.generateColumnWrite) { + s"${colOutVars(index)}.putNull(${ctx.rowWriteIdx});" + } else dt match { case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS => // Can't call setNullAt() for DecimalType with precision larger than 18. s"$rowWriter.write($index, (Decimal) null, ${t.precision}, ${t.scale});" case _ => s"$rowWriter.setNullAt($index);" } - val writeField = dt match { + val writeField = if (ctx.generateColumnWrite) { + s""" + System.out.println("rowIdx["+${ctx.rowWriteIdx}+"]: v="+${input.value}); + ${colOutVars(index)}.putFloat(${ctx.rowWriteIdx}, ${input.value}); + """.stripMargin + } else dt match { case t: StructType => s""" // Remember the current cursor so that we can calculate how many bytes are @@ -299,19 +358,23 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro val exprEvals = ctx.generateExpressions(expressions, useSubexprElimination) val exprTypes = expressions.map(_.dataType) - val numVarLenFields = exprTypes.count { + val numVarLenFields = if (ctx.generateColumnWrite) 0 else exprTypes.count { case dt if UnsafeRow.isFixedLength(dt) => false // TODO: consider large decimal and interval type case _ => true } val result = ctx.freshName("result") - ctx.addMutableState("UnsafeRow", result, s"$result = new UnsafeRow(${expressions.length});") + if (!ctx.generateColumnWrite) { + ctx.addMutableState("UnsafeRow", result, s"$result = new UnsafeRow(${expressions.length});") + } val holder = ctx.freshName("holder") - val holderClass = classOf[BufferHolder].getName - ctx.addMutableState(holderClass, holder, - s"this.$holder = new $holderClass($result, ${numVarLenFields * 32});") + if (!ctx.generateColumnWrite) { + val holderClass = classOf[BufferHolder].getName + ctx.addMutableState(holderClass, holder, + s"this.$holder = new $holderClass($result, ${numVarLenFields * 32});") + } val resetBufferHolder = if (numVarLenFields == 0) { "" diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java index 086547c793e3..70b544186f2d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -25,6 +25,8 @@ import org.apache.spark.TaskContext; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.execution.vectorized.ColumnVector; /** * An iterator interface used to pull the output from generated function for multiple operators @@ -32,6 +34,9 @@ */ public abstract class BufferedRowIterator { protected LinkedList currentRows = new LinkedList<>(); + protected ColumnarBatch columnarBatch; + protected java.util.Iterator rowIterator; + protected boolean isColumnarBatchAccessed = false; // used when there is no column in output protected UnsafeRow unsafeRow = new UnsafeRow(0); private long startTimeNs = System.nanoTime(); @@ -42,11 +47,15 @@ public boolean hasNext() throws IOException { if (currentRows.isEmpty()) { processNext(); } - return !currentRows.isEmpty(); + if (!isColumnarBatchAccessed) { return !currentRows.isEmpty(); } + if (rowIterator == null) { rowIterator = columnarBatch.rowIterator(); } + return rowIterator.hasNext(); } public InternalRow next() { - return currentRows.remove(); + if (!isColumnarBatchAccessed) { return currentRows.remove(); } + if (rowIterator == null) { rowIterator = columnarBatch.rowIterator(); } + return rowIterator.next().copyUnsafeRow(); } /** @@ -75,9 +84,16 @@ protected void append(InternalRow row) { * If it returns true, the caller should exit the loop (return from processNext()). */ protected boolean shouldStop() { - return !currentRows.isEmpty(); + if (!isColumnarBatchAccessed) { return !currentRows.isEmpty(); } + return false; // TODO: currentIdx < allocated # of rows of CS } + protected boolean isColumnarBatch() { return this.columnarBatch != null; } + protected int numColumns() { return this.columnarBatch.numCols(); } + protected int numRows() { return this.columnarBatch.numRows(); } + protected ColumnVector column(int i) { return this.columnarBatch.column(i); } + protected void registerColumnarBatch(ColumnarBatch columnarBatch) { this.columnarBatch = columnarBatch; } + /** * Increase the peak execution memory for current task. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 8cece73faa4b..61a1b7491c35 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -164,6 +164,28 @@ public InternalRow copy() { return row; } + public UnsafeRow copyUnsafeRow() { + UnsafeRow row = new UnsafeRow(columns.length); + int fixedSize = UnsafeRow.calculateBitSetWidthInBytes(row.numFields()) + 8 * row.numFields(); + byte[] buffer = new byte[fixedSize + 64]; // 64 is margin + row.pointTo(buffer, buffer.length); + for (int i = 0; i < numFields(); i++) { + if (isNullAt(i)) { + row.setNullAt(i); + } else { + DataType dt = columns[i].dataType(); + if (dt instanceof FloatType) { + row.setFloat(i, getFloat(i)); + } else if (dt instanceof DoubleType) { + row.setDouble(i, getDouble(i)); + } else { + throw new RuntimeException("Not implemented. " + dt); + } + } + } + return row; + } + @Override public boolean anyNull() { throw new NotImplementedException(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 85af4faf4d09..db1ae97b52fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -304,19 +304,28 @@ private[sql] case class BatchedDataSourceScanExec( |}""".stripMargin) ctx.currentVars = null + ctx.isRow = false // always false + ctx.isRowWrite = false // always false val rowidx = ctx.freshName("rowIdx") + val rowWriteIdx = ctx.freshName("rowWriteIdx") + ctx.rowWriteIdx = rowWriteIdx val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } - s""" + val isColumnarBatchAccessed = if (ctx.isRowWrite) "" else "isColumnarBatchAccessed = true;" + val source = s""" |if ($batch == null) { + | $isColumnarBatchAccessed | $nextBatch(); |} + |int $rowWriteIdx = 0; |while ($batch != null) { | int numRows = $batch.numRows(); | while ($idx < numRows) { | int $rowidx = $idx++; | ${consume(ctx, columnsBatchInput).trim} + | $rowWriteIdx++; + | ${ctx.columnarBatch}.setNumRows($rowWriteIdx); | if (shouldStop()) return; | } | $batch = null; @@ -325,6 +334,9 @@ private[sql] case class BatchedDataSourceScanExec( |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); |$scanTimeTotalNs = 0; """.stripMargin + ctx.isRowWrite = true // always true + ctx.isRow = true // always true + source } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index d6f7b6ed35db..6a551fe2d5f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -132,7 +132,9 @@ trait CodegenSupport extends SparkPlan { val evaluateInputs = evaluateVariables(outputVars) // generate the code to create a UnsafeRow ctx.currentVars = outputVars + ctx.generateColumnWrite = !ctx.isRowWrite && parent.isInstanceOf[WholeStageCodegenExec] val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false) + ctx.generateColumnWrite = false val code = s""" |$evaluateInputs |${ev.code.trim} @@ -352,13 +354,36 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co val clazz = CodeGenerator.compile(cleanedSource) val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator] buffer.init(index, Array(iter)) - new Iterator[InternalRow] { - override def hasNext: Boolean = { - val v = buffer.hasNext - if (!v) durationMs += buffer.durationMs() - v + if (!buffer.isColumnarBatch()) { + new Iterator[InternalRow] { + override def hasNext: Boolean = { + val v = buffer.hasNext + if (!v) durationMs += buffer.durationMs() + v + } + override def next: InternalRow = buffer.next() + } + } else { + new ColumnIterator[InternalRow] { + override def hasNext: Boolean = { + val v = buffer.hasNext + if (!v) durationMs += buffer.durationMs() + v + } + override def next: InternalRow = buffer.next() + + override def computeColumn: Unit = { + buffer.processNext + } + override def numColumns: Integer = buffer.numColumns + override def numRows: Integer = buffer.numRows + override def column(i: Integer): + org.apache.spark.sql.execution.vectorized.ColumnVector = buffer.column(i) + override def hasNextRow: Boolean = { + if (numRows == 0) { buffer.processNext } + if (rowIdx < numRows) true else false + } } - override def next: InternalRow = buffer.next() } } } else { @@ -394,9 +419,10 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co } else { "" } + val append = if (ctx.isRowWrite) s"append(${row.value}$doCopy);" else "" s""" |${row.code} - |append(${row.value}$doCopy); + |$append """.stripMargin.trim } @@ -416,6 +442,17 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co override def simpleString: String = "WholeStageCodegen" } +abstract class ColumnIterator[T] extends Iterator[T] { + def computeColumn: Unit = { } + def columnarBatch: T = { null.asInstanceOf[T] } + def numColumns: Integer = { 0 } + def numRows: Integer = { 0 } + def column(i: Integer): org.apache.spark.sql.execution.vectorized.ColumnVector = { + null + } + def hasNextRow: Boolean = { false } + var rowIdx: Integer = 0 +} /** * Find the chained plans that support codegen, collapse them together as WholeStageCodegen. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala index d30655e0c4a2..48fed33f1f3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala @@ -22,6 +22,7 @@ import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.columnar.ColumnBuilder._ import org.apache.spark.sql.execution.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder} +import org.apache.spark.sql.execution.vectorized.ColumnVector import org.apache.spark.sql.types._ private[columnar] trait ColumnBuilder { @@ -44,6 +45,10 @@ private[columnar] trait ColumnBuilder { * Returns the final columnar byte buffer. */ def build(): ByteBuffer + + def appendColumn(columnVector: ColumnVector, numRows: Integer): Unit = { + throw new UnsupportedOperationException() + } } private[columnar] class BasicColumnBuilder[JvmType]( @@ -72,6 +77,39 @@ private[columnar] class BasicColumnBuilder[JvmType]( columnType.append(row, ordinal, buffer) } + override def appendColumn(column: ColumnVector, numRows: Integer): Unit = { + // val row = new org.apache.spark.sql.catalyst.expressions.GenericMutableRow(1) + if (columnType.isInstanceOf[FLOAT.type]) { + buffer = ensureFreeSpace(buffer, columnType.defaultSize * numRows) + var i = 0 + while (i < numRows) { + if (!column.isNullAt(i)) { + val v = column.getFloat(i) + print(s"i=$i, v=$v, nRows=$numRows, columnType=${columnType.getClass.getName}\n") + // row.setFloat(0, v) + // columnType.append(row, 0, buffer) + buffer.putFloat(v) + } + i += 1 + } + } else if (columnType.isInstanceOf[DOUBLE.type]) { + buffer = ensureFreeSpace(buffer, columnType.defaultSize * numRows) + var i = 0 + while (i < numRows) { + if (!column.isNullAt(i)) { + val v = column.getDouble(i) + print(s"i=$i, v=$v, nRows=$numRows, columnType=${columnType.getClass.getName}\n") + // row.setDouble(0, v) + // columnType.append(row, 0, buffer) + buffer.putDouble(v) + } + i += 1 + } + } else { + throw new UnsupportedOperationException() + } + } + override def build(): ByteBuffer = { if (buffer.capacity() > buffer.position() * 1.1) { // trim the buffer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 009fbaa00657..e16ce99bc738 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -135,6 +135,13 @@ private[sql] case class InMemoryRelation( val output = child.output val cached = child.execute().mapPartitionsInternal { rowIterator => new Iterator[CachedBatch] { + val columnIterator = if ( + rowIterator.isInstanceOf[org.apache.spark.sql.execution.ColumnIterator[InternalRow]]) { + rowIterator.asInstanceOf[org.apache.spark.sql.execution.ColumnIterator[InternalRow]] + } else { + null + } + def next(): CachedBatch = { val columnBuilders = output.map { attribute => ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) @@ -142,28 +149,44 @@ private[sql] case class InMemoryRelation( var rowCount = 0 var totalSize = 0L - while (rowIterator.hasNext && rowCount < batchSize - && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { - val row = rowIterator.next() - // Added for SPARK-6082. This assertion can be useful for scenarios when something - // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM - // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat - // hard to decipher. - assert( - row.numFields == columnBuilders.length, - s"Row column number mismatch, expected ${output.size} columns, " + - s"but got ${row.numFields}." + - s"\nRow content: $row") + if (columnIterator == null) { + while (rowIterator.hasNext && rowCount < batchSize + && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { + val row = rowIterator.next() + // Added for SPARK-6082. This assertion can be useful for scenarios when something + // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM + // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat + // hard to decipher. + assert( + row.numFields == columnBuilders.length, + s"Row column number mismatch, expected ${output.size} columns, " + + s"but got ${row.numFields}." + + s"\nRow content: $row") + + var i = 0 + totalSize = 0 + while (i < row.numFields) { + columnBuilders(i).appendFrom(row, i) + totalSize += columnBuilders(i).columnStats.sizeInBytes + i += 1 + } + rowCount += 1 + } + } else { + val numColumns = columnIterator.numColumns + val numRows = columnIterator.numRows var i = 0 - totalSize = 0 - while (i < row.numFields) { - columnBuilders(i).appendFrom(row, i) - totalSize += columnBuilders(i).columnStats.sizeInBytes + while (i < numColumns) { + val name = columnBuilders(i).getClass.getName + print(s"columnBuilder[$i]: $name, numColumns=$numColumns, numRows=$numRows\n") + columnIterator.rowIdx = 0 + columnBuilders(i).appendColumn(columnIterator.column(i), numRows) + columnIterator.rowIdx = numRows i += 1 } - rowCount += 1 + rowCount = columnIterator.rowIdx } val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) @@ -175,7 +198,9 @@ private[sql] case class InMemoryRelation( }, stats) } - def hasNext: Boolean = rowIterator.hasNext + def hasNext: Boolean = { + if (columnIterator == null) rowIterator.hasNext else columnIterator.hasNextRow + } } }.persist(storageLevel) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala index 3a1931bfb5c8..c8f0476a166e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala @@ -63,6 +63,22 @@ private[columnar] trait NullableColumnBuilder extends ColumnBuilder { pos += 1 } + override def appendColumn( + column: org.apache.spark.sql.execution.vectorized.ColumnVector, numRows: Integer): Unit = { + if (!column.anyNullsSet()) { + var i = 0; + while (i < numRows) { + if (column.isNullAt(i)) { + nulls = ColumnBuilder.ensureFreeSpace(nulls, 4) + nulls.putInt(i) + nullCount += 1 + } + i += 1 + } + } + super.appendColumn(column, numRows) + } + abstract override def build(): ByteBuffer = { val nonNulls = super.build() val nullDataLen = nulls.position()