From 0b527e0fb47ff83c0e1d631f4a0baf179896fa1d Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Fri, 15 Jul 2022 17:32:18 -0700 Subject: [PATCH 1/3] Ability to expose driver metrics --- .../apache/spark/sql/connector/read/Scan.java | 11 +++ .../spark/sql/connector/write/Write.java | 11 +++ .../datasources/v2/BatchScanExec.scala | 5 +- .../v2/DataSourceV2ScanExecBase.scala | 14 +++- .../ui/SQLAppStatusListenerSuite.scala | 70 +++++++++++++++++++ 5 files changed, 108 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java index 941a11b8b1d3..081fc658a2d6 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java @@ -19,6 +19,7 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.read.streaming.ContinuousStream; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; @@ -127,4 +128,14 @@ default ContinuousStream toContinuousStream(String checkpointLocation) { default CustomMetric[] supportedCustomMetrics() { return new CustomMetric[]{}; } + + /** + * Return an array of custom metrics which are collected with values at the driver side only. + * Note that these metrics must be included in the supported custom metrics reported by + * `supportedCustomMetrics`. + */ + default CustomTaskMetric[] reportCustomDriverMetrics() { + CustomTaskMetric[] NO_METRICS = {}; + return NO_METRICS; + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java index 7da5d0c83f45..25c64c021133 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java @@ -21,6 +21,7 @@ import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.write.streaming.StreamingWrite; /** @@ -71,4 +72,14 @@ default StreamingWrite toStreaming() { default CustomMetric[] supportedCustomMetrics() { return new CustomMetric[]{}; } + + /** + * Return an array of custom metrics which are collected with values at the driver side only. + * Note that these metrics must be included in the supported custom metrics reported by + * `supportedCustomMetrics`. + */ + default CustomTaskMetric[] reportCustomDriverMetrics() { + CustomTaskMetric[] NO_METRICS = {}; + return NO_METRICS; + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index 8da1123c9fe2..d6ff5a152f41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources.v2 import com.google.common.base.Objects - import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -108,13 +107,15 @@ case class BatchScanExec( override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() override lazy val inputRDD: RDD[InternalRow] = { - if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) { + val rdd = if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) { // return an empty RDD with 1 partition if dynamic filtering removed the only split sparkContext.parallelize(Array.empty[InternalRow], 1) } else { new DataSourceRDD( sparkContext, filteredPartitions, readerFactory, supportsColumnar, customMetrics) } + postDriverMetrics() + rdd } override def doCanonicalize(): BatchScanExec = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index c5926edb6c5b..71476128e4cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, SinglePartition} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, PartitionReaderFactory, Scan} -import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode} +import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SQLExecution} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.SupportsMetadata @@ -168,6 +168,18 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { } } + protected def postDriverMetrics(): Unit = { + val driveSQLMetrics = scan.reportCustomDriverMetrics().map( customTaskMetric => { + val metric = metrics(customTaskMetric.name()) + metric.set(customTaskMetric.value()) + metric + }) + + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, + driveSQLMetrics) + } + override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputRows = longMetric("numOutputRows") inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 226b6e47a96f..9528843835ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -873,6 +873,41 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils assert(metrics(innerMetric.id) === expectedInnerValue) } + test("Report driver metrics from Datasource v2 scan") { + val statusStore = spark.sharedState.statusStore + val oldCount = statusStore.executionsList().size + + val schema = new StructType().add("i", "int").add("j", "int") + val physicalPlan = BatchScanExec(schema.toAttributes, new CustomDriverMetricScanBuilder(), + Seq.empty) + val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) { + override lazy val sparkPlan = physicalPlan + override lazy val executedPlan = physicalPlan + } + + SQLExecution.withNewExecutionId(dummyQueryExecution) { + physicalPlan.execute().collect() + } + + // Wait until the new execution is started and being tracked. + while (statusStore.executionsCount() < oldCount) { + Thread.sleep(100) + } + + // Wait for listener to finish computing the metrics for the execution. + while (statusStore.executionsList().isEmpty || + statusStore.executionsList().last.metricValues == null) { + Thread.sleep(100) + } + + val execId = statusStore.executionsList().last.executionId + val metrics = statusStore.executionMetrics(execId) + val expectedMetric = physicalPlan.metrics("custom_driver_metric_partition_count") + val expectedValue = "2" + assert(metrics.contains(expectedMetric.id)) + assert(metrics(expectedMetric.id) === expectedValue) + } + test("SPARK-36030: Report metrics from Datasource v2 write") { withTempDir { dir => val statusStore = spark.sharedState.statusStore @@ -1037,6 +1072,19 @@ class SimpleCustomMetric extends CustomMetric { } } +class SimpleCustomDriverMetric extends CustomMetric { + override def name(): String = "custom_driver_metric_partition_count" + override def description(): String = "Simple custom driver metrics - partition count" + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + taskMetrics.sum.toString + } +} + +class SimpleCustomDriverTaskMetric(value : Long) extends CustomTaskMetric { + override def name(): String = "custom_driver_metric_partition_count" + override def value(): Long = value +} + class BytesWrittenCustomMetric extends CustomMetric { override def name(): String = "bytesWritten" override def description(): String = "bytesWritten metric" @@ -1096,6 +1144,28 @@ class CustomMetricScanBuilder extends SimpleScanBuilder { override def createReaderFactory(): PartitionReaderFactory = CustomMetricReaderFactory } +class CustomDriverMetricScanBuilder extends SimpleScanBuilder { + + var partitionCount: Long = 0L + + override def planInputPartitions(): Array[InputPartition] = { + val partitions: Array[InputPartition] = Array(RangeInputPartition(0, 5), + RangeInputPartition(5, 10)) + partitionCount = partitions.length + partitions + } + + override def createReaderFactory(): PartitionReaderFactory = CustomMetricReaderFactory + + override def supportedCustomMetrics(): Array[CustomMetric] = { + Array(new SimpleCustomDriverMetric) + } + + override def reportCustomDriverMetrics(): Array[CustomTaskMetric] = { + Array(new SimpleCustomDriverTaskMetric(partitionCount)) + } +} + class CustomMetricsCSVDataWriter(fs: FileSystem, file: Path) extends CSVDataWriter(fs, file) { override def currentMetricsValues(): Array[CustomTaskMetric] = { val metric = new CustomTaskMetric { From e31ce4f358f700e49b9e223e11947d21c1adfcec Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Sun, 17 Jul 2022 06:23:12 -0700 Subject: [PATCH 2/3] Fix checkstyle, address review comments --- .../spark/sql/execution/datasources/v2/BatchScanExec.scala | 1 + .../spark/sql/execution/ui/SQLAppStatusListenerSuite.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index d6ff5a152f41..23a1ee23893a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2 import com.google.common.base.Objects + import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 9528843835ae..de65babbf1f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -873,7 +873,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils assert(metrics(innerMetric.id) === expectedInnerValue) } - test("Report driver metrics from Datasource v2 scan") { + test("SPARK-39635: Report driver metrics from Datasource v2 scan") { val statusStore = spark.sharedState.statusStore val oldCount = statusStore.executionsList().size From 717e7575abad033251e2e661e0bcbcc6702491ca Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Thu, 21 Jul 2022 11:09:22 -0700 Subject: [PATCH 3/3] Address review comments --- .../org/apache/spark/sql/connector/read/Scan.java | 7 +++---- .../org/apache/spark/sql/connector/write/Write.java | 11 ----------- .../execution/datasources/v2/ContinuousScanExec.scala | 4 +++- .../datasources/v2/DataSourceV2ScanExecBase.scala | 2 +- .../execution/datasources/v2/MicroBatchScanExec.scala | 5 ++++- .../sql/execution/ui/SQLAppStatusListenerSuite.scala | 2 +- 6 files changed, 12 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java index 081fc658a2d6..556ad0c4d3f7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java @@ -130,12 +130,11 @@ default CustomMetric[] supportedCustomMetrics() { } /** - * Return an array of custom metrics which are collected with values at the driver side only. + * Returns an array of custom metrics which are collected with values at the driver side only. * Note that these metrics must be included in the supported custom metrics reported by * `supportedCustomMetrics`. */ - default CustomTaskMetric[] reportCustomDriverMetrics() { - CustomTaskMetric[] NO_METRICS = {}; - return NO_METRICS; + default CustomTaskMetric[] reportDriverMetrics() { + return new CustomTaskMetric[]{}; } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java index 25c64c021133..7da5d0c83f45 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java @@ -21,7 +21,6 @@ import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.metric.CustomMetric; -import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.write.streaming.StreamingWrite; /** @@ -72,14 +71,4 @@ default StreamingWrite toStreaming() { default CustomMetric[] supportedCustomMetrics() { return new CustomMetric[]{}; } - - /** - * Return an array of custom metrics which are collected with values at the driver side only. - * Note that these metrics must be included in the supported custom metrics reported by - * `supportedCustomMetrics`. - */ - default CustomTaskMetric[] reportCustomDriverMetrics() { - CustomTaskMetric[] NO_METRICS = {}; - return NO_METRICS; - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala index cf16a81eaf95..bcb7149fd0b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala @@ -55,7 +55,7 @@ case class ContinuousScanExec( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) .askSync[Unit](SetReaderPartitions(partitions.size)) - new ContinuousDataSourceRDD( + val inputRDD = new ContinuousDataSourceRDD( sparkContext, conf.continuousStreamingExecutorQueueSize, conf.continuousStreamingExecutorPollIntervalMs, @@ -63,5 +63,7 @@ case class ContinuousScanExec( schema, readerFactory, customMetrics) + postDriverMetrics() + inputRDD } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 71476128e4cc..e6d7cddc71b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -169,7 +169,7 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { } protected def postDriverMetrics(): Unit = { - val driveSQLMetrics = scan.reportCustomDriverMetrics().map( customTaskMetric => { + val driveSQLMetrics = scan.reportDriverMetrics().map(customTaskMetric => { val metric = metrics(customTaskMetric.name()) metric.set(customTaskMetric.value()) metric diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala index cb99cd73dad4..c545b3dd50b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MicroBatchScanExec.scala @@ -48,6 +48,9 @@ case class MicroBatchScanExec( override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory() override lazy val inputRDD: RDD[InternalRow] = { - new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, customMetrics) + val inputRDD = new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, + customMetrics) + postDriverMetrics() + inputRDD } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index de65babbf1f7..1b4f84c68f56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -1161,7 +1161,7 @@ class CustomDriverMetricScanBuilder extends SimpleScanBuilder { Array(new SimpleCustomDriverMetric) } - override def reportCustomDriverMetrics(): Array[CustomTaskMetric] = { + override def reportDriverMetrics(): Array[CustomTaskMetric] = { Array(new SimpleCustomDriverTaskMetric(partitionCount)) } }