Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-958] Fix SQLMetrics inaccuracy in JVM/Native R2C and CoalesceBatcth #959

Merged
merged 1 commit into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ case class RowToArrowColumnarExec(child: SparkPlan) extends UnaryExecNode {
rowCount += 1
}
vectors.foreach(v => v.asInstanceOf[ArrowWritableColumnVector].setValueCount(rowCount))
processTime.set(NANOSECONDS.toMillis(elapse))
processTime += NANOSECONDS.toMillis(elapse)
numInputRows += rowCount
numOutputBatches += 1
last_cb = new ColumnarBatch(vectors.toArray, rowCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ case class ArrowCoalesceBatchesExec(child: SparkPlan) extends UnaryExecNode {
val arrBufSizes = new ArrayBuffer[Array[Long]]()
val numrows = ListBuffer[Int]()

val beforeConcat = System.nanoTime
while (hasNext && rowCount < recordsPerBatch) {
val delta: ColumnarBatch = iter.next()
delta.retain()
Expand Down Expand Up @@ -134,16 +133,17 @@ case class ArrowCoalesceBatchesExec(child: SparkPlan) extends UnaryExecNode {
val schema = new Schema(expected_output_arrow_fields.asJava)
val arrowSchema = ConverterUtils.getSchemaBytesBuf(schema)

val beforeConcat = System.nanoTime
val serializedRecordBatch = jniWrapper.nativeCoalesceBatches(
arrowSchema, rowCount, numrows.toArray, arrBufAddrs.toArray, arrBufSizes.toArray,
SparkMemoryUtils.contextMemoryPool().getNativeInstanceId)
concatTime += System.nanoTime - beforeConcat
val rb = UnsafeRecordBatchSerializer.deserializeUnsafe(SparkMemoryUtils.contextAllocator(), serializedRecordBatch)
val ColVecArr = ConverterUtils.fromArrowRecordBatch(schema, rb)
val outputNumRows = rb.getLength
ConverterUtils.releaseArrowRecordBatch(rb)
val bigColBatch = new ColumnarBatch(ColVecArr.map(v => v.asInstanceOf[ColumnVector]).toArray, rowCount)

concatTime += System.nanoTime - beforeConcat
numOutputRows += rowCount
numInputBatches += batchesToAppend.length
numOutputBatches += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
val rowLength = new ListBuffer[Long]()
var rowCount = 0
var offset = 0
val start = System.nanoTime()

assert(firstRow.isInstanceOf[UnsafeRow])
val unsafeRow = firstRow.asInstanceOf[UnsafeRow]
Expand Down Expand Up @@ -180,8 +179,10 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
if (schemaBytes == null) {
schemaBytes = ConverterUtils.getSchemaBytesBuf(arrowSchema)
}
val start = System.nanoTime()
val serializedRecordBatch = jniWrapper.nativeConvertRowToColumnar(schemaBytes, rowLength.toArray,
arrowBuf.memoryAddress(), SparkMemoryUtils.contextMemoryPool().getNativeInstanceId)
elapse = System.nanoTime() - start
numInputRows += rowCount
numOutputBatches += 1
val rb = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, serializedRecordBatch)
Expand All @@ -190,8 +191,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
ConverterUtils.releaseArrowRecordBatch(rb)
arrowBuf.close()
last_cb = new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows)
elapse = System.nanoTime() - start
processTime.set(NANOSECONDS.toMillis(elapse))
processTime += NANOSECONDS.toMillis(elapse)
last_cb
} else {
logInfo("not unsaferow, fallback to java based r2c")
Expand All @@ -212,7 +212,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
rowCount += 1
}
vectors.foreach(v => v.asInstanceOf[ArrowWritableColumnVector].setValueCount(rowCount))
processTime.set(NANOSECONDS.toMillis(elapse))
processTime += NANOSECONDS.toMillis(elapse)
numInputRows += rowCount
numOutputBatches += 1
last_cb = new ColumnarBatch(vectors.toArray, rowCount)
Expand Down