diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 72472a540..028f81796 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -26,6 +26,7 @@ import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.dictionary.DictionaryProvider import org.apache.spark.SparkException import org.apache.spark.sql.comet.util.Utils +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.comet.CometArrowAllocator @@ -148,12 +149,17 @@ class NativeUtil { */ def getNextBatch( numOutputCols: Int, + arrowFfiMetric: Option[SQLMetric], func: (Array[Long], Array[Long]) => Long): Option[ColumnarBatch] = { + + val start = System.nanoTime() val (arrays, schemas) = allocateArrowStructs(numOutputCols) val arrayAddrs = arrays.map(_.memoryAddress()) val schemaAddrs = schemas.map(_.memoryAddress()) + var arrowFfiTime = System.nanoTime() - start + val result = func(arrayAddrs, schemaAddrs) result match { @@ -161,7 +167,10 @@ class NativeUtil { // EOF None case numRows => + val start = System.nanoTime() val cometVectors = importVector(arrays, schemas) + arrowFfiTime += System.nanoTime() - start + arrowFfiMetric.foreach(_.add(arrowFfiTime)) Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt)) case flag => throw new IllegalStateException(s"Invalid native flag: $flag") diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index e0d7c1bc9..a84ff5a34 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -113,8 +113,14 @@ Some Comet metrics are not directly comparable to Spark metrics in some cases: Comet also adds some custom metrics: -### ShuffleWriterExec +### CometScanExec -| Metric | Description | -| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. | +| Metric | Description | +| ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------- | +| `scanTime` | Total time to read Parquet batches into JVM. This does not include the Arrow FFI cost of exporting these batches to native code for processing. | + +### Common to all Comet Operators + +| Metric | Description | +| ---------------- | ------------------------------------------------------------------------------------------------ | +| `arrow_ffi_time` | Measure the time it takes to transfer Arrow batches between JVM and native code using Arrow FFI. | diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index c79eeeb4a..7587ff06d 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -139,7 +139,6 @@ impl ExecutionPlan for ShuffleWriterExec { ) -> Result { let input = self.input.execute(partition, Arc::clone(&context))?; let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0); - let jvm_fetch_time = MetricBuilder::new(&self.metrics).subset_time("jvm_fetch_time", 0); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -152,7 +151,6 @@ impl ExecutionPlan for ShuffleWriterExec { self.partitioning.clone(), metrics, context, - jvm_fetch_time, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -1085,7 +1083,6 @@ impl Debug for ShuffleRepartitioner { } } -#[allow(clippy::too_many_arguments)] async fn external_shuffle( mut input: SendableRecordBatchStream, partition_id: usize, @@ -1094,7 +1091,6 @@ async fn external_shuffle( partitioning: Partitioning, metrics: ShuffleRepartitionerMetrics, context: Arc, - jvm_fetch_time: Time, ) -> Result { let schema = input.schema(); let mut repartitioner = ShuffleRepartitioner::new( @@ -1108,23 +1104,13 @@ async fn external_shuffle( context.session_config().batch_size(), ); - loop { - let mut timer = jvm_fetch_time.timer(); - let b = input.next().await; - timer.stop(); - - match b { - Some(batch_result) => { - // Block on the repartitioner to insert the batch and shuffle the rows - // into the corresponding partition buffer. - // Otherwise, pull the next batch from the input stream might overwrite the - // current batch in the repartitioner. - block_on(repartitioner.insert_batch(batch_result?))?; - } - _ => break, - } + while let Some(batch) = input.next().await { + // Block on the repartitioner to insert the batch and shuffle the rows + // into the corresponding partition buffer. + // Otherwise, pull the next batch from the input stream might overwrite the + // current batch in the repartitioner. + block_on(repartitioner.insert_batch(batch?))?; } - repartitioner.shuffle_write().await } diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 2cb8a84d9..cc9a1176f 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -77,6 +77,8 @@ pub struct ScanExec { metrics: ExecutionPlanMetricsSet, /// Baseline metrics baseline_metrics: BaselineMetrics, + /// Timer + arrow_ffi_time: Time, } impl ScanExec { @@ -88,6 +90,7 @@ impl ScanExec { ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); + let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0); // Scan's schema is determined by the input batch, so we need to set it before execution. // Note that we determine if arrays are dictionary-encoded based on the @@ -97,8 +100,12 @@ impl ScanExec { // Dictionary-encoded primitive arrays are always unpacked. let first_batch = if let Some(input_source) = input_source.as_ref() { let mut timer = baseline_metrics.elapsed_compute().timer(); - let batch = - ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?; + let batch = ScanExec::get_next( + exec_context_id, + input_source.as_obj(), + data_types.len(), + &arrow_ffi_time, + )?; timer.stop(); batch } else { @@ -124,6 +131,7 @@ impl ScanExec { cache, metrics: metrics_set, baseline_metrics, + arrow_ffi_time, schema, }) } @@ -171,6 +179,7 @@ impl ScanExec { self.exec_context_id, self.input_source.as_ref().unwrap().as_obj(), self.data_types.len(), + &self.arrow_ffi_time, )?; *current_batch = Some(next_batch); } @@ -185,6 +194,7 @@ impl ScanExec { exec_context_id: i64, iter: &JObject, num_cols: usize, + arrow_ffi_time: &Time, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { // This is a unit test. We don't need to call JNI. @@ -197,6 +207,7 @@ impl ScanExec { exec_context_id )))); } + let mut timer = arrow_ffi_time.timer(); let mut env = JVMClasses::get_env()?; @@ -255,6 +266,8 @@ impl ScanExec { } } + timer.stop(); + Ok(InputBatch::new(inputs, Some(num_rows as usize))) } } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index bff3e7925..860fe4901 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -21,6 +21,7 @@ package org.apache.comet import org.apache.spark._ import org.apache.spark.sql.comet.CometMetricNode +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized._ import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} @@ -49,6 +50,7 @@ class CometExecIterator( inputs: Seq[Iterator[ColumnarBatch]], numOutputCols: Int, protobufQueryPlan: Array[Byte], + arrowFfiMetric: Option[SQLMetric], nativeMetrics: CometMetricNode, numParts: Int, partitionIndex: Int) @@ -102,6 +104,7 @@ class CometExecIterator( nativeUtil.getNextBatch( numOutputCols, + arrowFfiMetric, (arrayAddrs, schemaAddrs) => { val ctx = TaskContext.get() nativeLib.executePlan(ctx.stageId(), partitionIndex, plan, arrayAddrs, schemaAddrs) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index 8ea0b1765..2a698c053 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -54,6 +54,9 @@ case class CometCollectLimitExec( private lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( + CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric( + sparkContext, + CometMetricNode.ARROW_FFI_TIME_DESCRIPTION), "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 47c89d943..551737b52 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -69,6 +69,9 @@ case class CometMetricNode(metrics: Map[String, SQLMetric], children: Seq[CometM object CometMetricNode { + val ARROW_FFI_TIME_KEY = "arrow_ffi_time" + val ARROW_FFI_TIME_DESCRIPTION = "Arrow FFI time" + /** * The baseline SQL metrics for DataFusion `BaselineMetrics`. */ @@ -77,7 +80,8 @@ object CometMetricNode { "output_rows" -> SQLMetrics.createMetric(sc, "number of output rows"), "elapsed_compute" -> SQLMetrics.createNanoTimingMetric( sc, - "total time (in ms) spent in this operator")) + "total time (in ms) spent in this operator"), + ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric(sc, ARROW_FFI_TIME_DESCRIPTION)) } /** @@ -93,41 +97,41 @@ object CometMetricNode { * SQL Metrics for DataFusion HashJoin */ def hashJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = { - Map( - "build_time" -> - SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"), - "build_input_batches" -> - SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"), - "build_input_rows" -> - SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"), - "build_mem_used" -> - SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), - "input_batches" -> - SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), - "input_rows" -> - SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), - "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), - "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"), - "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining")) + baselineMetrics(sc) ++ + Map( + "build_time" -> + SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"), + "build_input_batches" -> + SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"), + "build_input_rows" -> + SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"), + "build_mem_used" -> + SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), + "input_batches" -> + SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), + "input_rows" -> + SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), + "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), + "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining")) } /** * SQL Metrics for DataFusion SortMergeJoin */ def sortMergeJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = { - Map( - "peak_mem_used" -> - SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), - "input_batches" -> - SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), - "input_rows" -> - SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), - "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), - "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"), - "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"), - "spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"), - "spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"), - "spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows")) + baselineMetrics(sc) ++ + Map( + "peak_mem_used" -> + SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), + "input_batches" -> + SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), + "input_rows" -> + SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), + "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), + "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"), + "spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"), + "spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"), + "spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows")) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 5582f4d68..92d996c32 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -54,6 +54,9 @@ case class CometTakeOrderedAndProjectExec( private lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( + CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric( + sparkContext, + CometMetricNode.ARROW_FFI_TIME_DESCRIPTION), "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index a7a33c40d..bcb8d094b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -77,9 +77,9 @@ case class CometShuffleExchangeExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric( + CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric( sparkContext, - "time fetching batches from JVM"), + CometMetricNode.ARROW_FFI_TIME_DESCRIPTION), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics @@ -482,18 +482,11 @@ class CometShuffleWriteProcessor( // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( + CometMetricNode.ARROW_FFI_TIME_KEY -> metrics(CometMetricNode.ARROW_FFI_TIME_KEY), "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), "elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) - val nativeMetrics = if (metrics.contains("jvm_fetch_time")) { - CometMetricNode( - nativeSQLMetrics ++ Map("jvm_fetch_time" -> - metrics("jvm_fetch_time"))) - } else { - CometMetricNode(nativeSQLMetrics) - } - // Getting rid of the fake partitionId val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2) @@ -501,7 +494,8 @@ class CometShuffleWriteProcessor( Seq(newInputs.asInstanceOf[Iterator[ColumnarBatch]]), outputAttributes.length, nativePlan, - nativeMetrics, + metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY), + CometMetricNode(nativeSQLMetrics), numParts, context.partitionId()) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 77188312e..d2003b52a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -127,6 +127,7 @@ object CometExec { inputs, numOutputCols, nativePlan, + None, CometMetricNode(Map.empty), numParts, partitionIdx) @@ -136,6 +137,7 @@ object CometExec { inputs: Seq[Iterator[ColumnarBatch]], numOutputCols: Int, nativePlan: Operator, + arrowFfiMetric: Option[SQLMetric], nativeMetrics: CometMetricNode, numParts: Int, partitionIdx: Int): CometExecIterator = { @@ -148,6 +150,7 @@ object CometExec { inputs, numOutputCols, bytes, + arrowFfiMetric, nativeMetrics, numParts, partitionIdx) @@ -240,6 +243,7 @@ abstract class CometNativeExec extends CometExec { inputs, output.length, serializedPlanCopy, + metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY), nativeMetrics, numParts, partitionIndex)