diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 5a639baafec..665fe2bfe7c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -130,7 +130,7 @@ class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric) * @note The RAPIDS shuffle does not use this code. */ class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Array[DataType], - useKudo: Boolean) + useKudo: Boolean, kudoMeasureBufferCopy: Boolean) extends Serializer with Serializable { private lazy val kudo = { @@ -143,7 +143,7 @@ class GpuColumnarBatchSerializer(metrics: Map[String, GpuMetric], dataTypes: Arr override def newInstance(): SerializerInstance = { if (useKudo) { - new KudoSerializerInstance(metrics, dataTypes, kudo) + new KudoSerializerInstance(metrics, dataTypes, kudo, kudoMeasureBufferCopy) } else { new GpuColumnarBatchSerializerInstance(metrics) } @@ -348,7 +348,8 @@ object SerializedTableColumn { private class KudoSerializerInstance( val metrics: Map[String, GpuMetric], val dataTypes: Array[DataType], - val kudo: Option[KudoSerializer] + val kudo: Option[KudoSerializer], + val measureBufferCopyTime: Boolean, ) extends SerializerInstance { private val dataSize = metrics(METRIC_DATA_SIZE) private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME) @@ -399,8 +400,10 @@ private class KudoSerializerInstance( dataSize += writeMetric.getWrittenBytes serCalcHeaderTime += writeMetric.getCalcHeaderTime - serCopyHeaderTime += writeMetric.getCopyHeaderTime - serCopyBufferTime += writeMetric.getCopyBufferTime + if (measureBufferCopyTime) { + serCopyHeaderTime += writeMetric.getCopyHeaderTime + serCopyBufferTime += writeMetric.getCopyBufferTime + } } } else { withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 104b2e808be..f218f866cac 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2025,6 +2025,15 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. .booleanConf .createWithDefault(false) + val SHUFFLE_KUDO_SERIALIZER_MEASURE_BUFFER_COPY_ENABLED = + conf("spark.rapids.shuffle.kudo.serializer.measure.buffer.copy.enabled") + .doc("Enable or disable measuring buffer copy time when using Kudo serializer for the shuffle.") + .internal() + .startupOnly() + .booleanConf + .createWithDefault(false) + + // USER FACING DEBUG CONFIGS val SHUFFLE_COMPRESSION_MAX_BATCH_MEMORY = @@ -3112,6 +3121,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val shuffleKudoSerializerEnabled: Boolean = get(SHUFFLE_KUDO_SERIALIZER_ENABLED) + lazy val shuffleKudoMeasureBufferCopyEnabled: Boolean = + get(SHUFFLE_KUDO_SERIALIZER_MEASURE_BUFFER_COPY_ENABLED) + def isUCXShuffleManagerMode: Boolean = RapidsShuffleManagerMode .withName(get(SHUFFLE_MANAGER_MODE)) == RapidsShuffleManagerMode.UCX diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 0e1b857317c..978caaacad2 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -172,6 +172,9 @@ abstract class GpuShuffleExchangeExecBase( import GpuMetric._ private lazy val useKudo = RapidsConf.SHUFFLE_KUDO_SERIALIZER_ENABLED.get(child.conf) + private lazy val kudoMeasureBufferCopy = RapidsConf + .SHUFFLE_KUDO_SERIALIZER_MEASURE_BUFFER_COPY_ENABLED + .get(child.conf) private lazy val useGPUShuffle = { gpuOutputPartitioning match { @@ -223,7 +226,7 @@ abstract class GpuShuffleExchangeExecBase( // This value must be lazy because the child's output may not have been resolved // yet in all cases. private lazy val serializer: Serializer = new GpuColumnarBatchSerializer( - allMetrics, sparkTypes, useKudo) + allMetrics, sparkTypes, useKudo, kudoMeasureBufferCopy) @transient lazy val inputBatchRDD: RDD[ColumnarBatch] = child.executeColumnar() diff --git a/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala b/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala index c006031da7d..de1e40f43ac 100644 --- a/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala +++ b/tests/src/test/spark321/scala/org/apache/spark/sql/rapids/RapidsShuffleThreadedReaderSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * Copyright (c) 2022-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -114,7 +114,7 @@ class RapidsShuffleThreadedReaderSuite val numMaps = 6 val keyValuePairsPerMap = 10 val serializer = new GpuColumnarBatchSerializer(Map.empty.withDefaultValue(NoopMetric), - Array.empty, false) + Array.empty, false, false) // Make a mock BlockManager that will return RecordingManagedByteBuffers of data, so that we // can ensure retain() and release() are properly called.