Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -127,4 +128,13 @@ default ContinuousStream toContinuousStream(String checkpointLocation) {
default CustomMetric[] supportedCustomMetrics() {
return new CustomMetric[]{};
}

/**
* Returns an array of custom metrics which are collected with values at the driver side only.
* Note that these metrics must be included in the supported custom metrics reported by
* `supportedCustomMetrics`.
*/
default CustomTaskMetric[] reportDriverMetrics() {
return new CustomTaskMetric[]{};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@ case class BatchScanExec(
override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()

override lazy val inputRDD: RDD[InternalRow] = {
if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
val rdd = if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
// return an empty RDD with 1 partition if dynamic filtering removed the only split
sparkContext.parallelize(Array.empty[InternalRow], 1)
} else {
new DataSourceRDD(
sparkContext, filteredPartitions, readerFactory, supportsColumnar, customMetrics)
}
postDriverMetrics()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for BatchScanExec, there seems other places we need doing this too. E.g., MicroBatchScanExec, ContinuousScanExec which are based on DataSourceV2ScanExecBase.

Copy link
Member

@viirya viirya Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For writer, there is V2TableWriteExec, V2ExistingTableWriteExec, WriteToContinuousDataSourceExec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya @cloud-fan I will create a follow up PR for the write paths.
As for read paths, I have updated ContinuousScanExec, MicroBatchScanExec similar to BatchScanExec.
Can you please point me to test case that I can run to verify if the behavior is as expected.

rdd
}

override def doCanonicalize(): BatchScanExec = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ case class ContinuousScanExec(
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
sparkContext.env)
.askSync[Unit](SetReaderPartitions(partitions.size))
new ContinuousDataSourceRDD(
val inputRDD = new ContinuousDataSourceRDD(
sparkContext,
conf.continuousStreamingExecutorQueueSize,
conf.continuousStreamingExecutorPollIntervalMs,
partitions.map(_.head),
schema,
readerFactory,
customMetrics)
postDriverMetrics()
inputRDD
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode}
import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SQLExecution}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.SupportsMetadata
Expand Down Expand Up @@ -168,6 +168,18 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
}
}

protected def postDriverMetrics(): Unit = {
val driveSQLMetrics = scan.reportDriverMetrics().map(customTaskMetric => {
val metric = metrics(customTaskMetric.name())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm what if the driver metric is not in metrics? this will throw NoSuchElementException?

Copy link
Member

@viirya viirya Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think so. Considering this is implemented by the developers for DS v2 data sources and not from end-users, I don't think this would happen. Otherwise it is caught during development.

metric.set(customTaskMetric.value())
metric
})

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
driveSQLMetrics)
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ case class MicroBatchScanExec(
override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()

override lazy val inputRDD: RDD[InternalRow] = {
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, customMetrics)
val inputRDD = new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar,
customMetrics)
postDriverMetrics()
inputRDD
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,41 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
assert(metrics(innerMetric.id) === expectedInnerValue)
}

test("SPARK-39635: Report driver metrics from Datasource v2 scan") {
val statusStore = spark.sharedState.statusStore
val oldCount = statusStore.executionsList().size

val schema = new StructType().add("i", "int").add("j", "int")
val physicalPlan = BatchScanExec(schema.toAttributes, new CustomDriverMetricScanBuilder(),
Seq.empty)
val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
override lazy val sparkPlan = physicalPlan
override lazy val executedPlan = physicalPlan
}

SQLExecution.withNewExecutionId(dummyQueryExecution) {
physicalPlan.execute().collect()
}

// Wait until the new execution is started and being tracked.
while (statusStore.executionsCount() < oldCount) {
Thread.sleep(100)
}

// Wait for listener to finish computing the metrics for the execution.
while (statusStore.executionsList().isEmpty ||
statusStore.executionsList().last.metricValues == null) {
Thread.sleep(100)
}

val execId = statusStore.executionsList().last.executionId
val metrics = statusStore.executionMetrics(execId)
val expectedMetric = physicalPlan.metrics("custom_driver_metric_partition_count")
val expectedValue = "2"
assert(metrics.contains(expectedMetric.id))
assert(metrics(expectedMetric.id) === expectedValue)
}

test("SPARK-36030: Report metrics from Datasource v2 write") {
withTempDir { dir =>
val statusStore = spark.sharedState.statusStore
Expand Down Expand Up @@ -1037,6 +1072,19 @@ class SimpleCustomMetric extends CustomMetric {
}
}

class SimpleCustomDriverMetric extends CustomMetric {
override def name(): String = "custom_driver_metric_partition_count"
override def description(): String = "Simple custom driver metrics - partition count"
override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
taskMetrics.sum.toString
}
}

class SimpleCustomDriverTaskMetric(value : Long) extends CustomTaskMetric {
override def name(): String = "custom_driver_metric_partition_count"
override def value(): Long = value
}

class BytesWrittenCustomMetric extends CustomMetric {
override def name(): String = "bytesWritten"
override def description(): String = "bytesWritten metric"
Expand Down Expand Up @@ -1096,6 +1144,28 @@ class CustomMetricScanBuilder extends SimpleScanBuilder {
override def createReaderFactory(): PartitionReaderFactory = CustomMetricReaderFactory
}

class CustomDriverMetricScanBuilder extends SimpleScanBuilder {

var partitionCount: Long = 0L

override def planInputPartitions(): Array[InputPartition] = {
val partitions: Array[InputPartition] = Array(RangeInputPartition(0, 5),
RangeInputPartition(5, 10))
partitionCount = partitions.length
partitions
}

override def createReaderFactory(): PartitionReaderFactory = CustomMetricReaderFactory

override def supportedCustomMetrics(): Array[CustomMetric] = {
Array(new SimpleCustomDriverMetric)
}

override def reportDriverMetrics(): Array[CustomTaskMetric] = {
Array(new SimpleCustomDriverTaskMetric(partitionCount))
}
}

class CustomMetricsCSVDataWriter(fs: FileSystem, file: Path) extends CSVDataWriter(fs, file) {
override def currentMetricsValues(): Array[CustomTaskMetric] = {
val metric = new CustomTaskMetric {
Expand Down