Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.metric;

public interface SupportCustomMetrics {

/**
* Returns an array of supported custom metrics with name and description.
* By default, it returns empty array.
*/
default CustomMetric[] supportedCustomMetrics() {
return new CustomMetric[]{};
}

/**
* Returns an array of custom metrics which are collected with values at the driver side only.
* Note that these metrics must be included in the supported custom metrics reported by
* `supportedCustomMetrics`.
*
* @since 3.4.0
*/
default CustomTaskMetric[] reportDriverMetrics() {
return new CustomTaskMetric[]{};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@

import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.apache.spark.sql.connector.metric.SupportCustomMetrics;
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
Expand All @@ -43,7 +42,7 @@
* @since 3.0.0
*/
@Evolving
public interface Scan {
public interface Scan extends SupportCustomMetrics {

/**
* Returns the actual schema of this data source scan, which may be different from the physical
Expand Down Expand Up @@ -115,25 +114,6 @@ default ContinuousStream toContinuousStream(String checkpointLocation) {
"_LEGACY_ERROR_TEMP_3149", Map.of("description", description()));
}

/**
* Returns an array of supported custom metrics with name and description.
* By default it returns empty array.
*/
default CustomMetric[] supportedCustomMetrics() {
return new CustomMetric[]{};
}

/**
* Returns an array of custom metrics which are collected with values at the driver side only.
* Note that these metrics must be included in the supported custom metrics reported by
* `supportedCustomMetrics`.
*
* @since 3.4.0
*/
default CustomTaskMetric[] reportDriverMetrics() {
return new CustomTaskMetric[]{};
}

/**
* This enum defines how the columnar support for the partitions of the data source
* should be determined. The default value is `PARTITION_DEFINED` which indicates that each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.metric.SupportCustomMetrics;
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;

/**
Expand All @@ -38,7 +38,7 @@
* @since 3.2.0
*/
@Evolving
public interface Write {
public interface Write extends SupportCustomMetrics {

/**
* Returns the description associated with this write.
Expand Down Expand Up @@ -68,12 +68,4 @@ default StreamingWrite toStreaming() {
throw new SparkUnsupportedOperationException(
"_LEGACY_ERROR_TEMP_3138", Map.of("description", description()));
}

/**
* Returns an array of supported custom metrics with name and description.
* By default it returns empty array.
*/
default CustomMetric[] supportedCustomMetrics() {
return new CustomMetric[]{};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,19 @@ import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning
import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowComparableWrapper}
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SQLExecution}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.SupportsMetadata
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils

trait DataSourceV2ScanExecBase extends LeafExecNode {

lazy val customMetrics = scan.supportedCustomMetrics().map { customMetric =>
customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric)
}.toMap
lazy val customMetrics: Map[String, SQLMetric] =
SQLMetrics.createV2CustomMetrics(sparkContext, scan.supportedCustomMetrics())

override lazy val metrics = {
override lazy val metrics: Map[String, SQLMetric] = {
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) ++
customMetrics
}
Expand Down Expand Up @@ -191,15 +189,7 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
}

protected def postDriverMetrics(): Unit = {
val driveSQLMetrics = scan.reportDriverMetrics().map(customTaskMetric => {
val metric = metrics(customTaskMetric.name())
metric.set(customTaskMetric.value())
metric
})

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
driveSQLMetrics.toImmutableArraySeq)
SQLMetrics.postV2DriverMetrics(sparkContext, scan.reportDriverMetrics(), metrics)
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.sources.InsertableRelation

/**
Expand Down Expand Up @@ -62,9 +63,15 @@ sealed trait V1FallbackWriters extends LeafV2CommandExec with SupportsV1Write {
def refreshCache: () => Unit
def write: V1Write

protected val customMetrics: Map[String, SQLMetric] =
SQLMetrics.createV2CustomMetrics(sparkContext, write.supportedCustomMetrics())

override lazy val metrics: Map[String, SQLMetric] = customMetrics

override def run(): Seq[InternalRow] = {
val writtenRows = writeWithV1(write.toInsertableRelation)
refreshCache()
SQLMetrics.postV2DriverMetrics(sparkContext, write.reportDriverMetrics(), metrics)
writtenRows
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,12 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec {
def write: Write

override val customMetrics: Map[String, SQLMetric] =
write.supportedCustomMetrics().map { customMetric =>
customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric)
}.toMap
SQLMetrics.createV2CustomMetrics(sparkContext, write.supportedCustomMetrics())

override protected def run(): Seq[InternalRow] = {
val writtenRows = writeWithV2(write.toBatch)
refreshCache()
SQLMetrics.postV2DriverMetrics(sparkContext, write.reportDriverMetrics(), metrics)
writtenRows
}
}
Expand All @@ -355,7 +354,7 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {

protected val customMetrics: Map[String, SQLMetric] = Map.empty

override lazy val metrics = customMetrics
override lazy val metrics: Map[String, SQLMetric] = customMetrics

protected def writeWithV2(batchWrite: BatchWrite): Seq[InternalRow] = {
val rdd: RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
import org.apache.spark.util.AccumulatorContext.internOption
import org.apache.spark.util.ArrayImplicits._

/**
* A metric used in a SQL query plan. This is implemented as an [[AccumulatorV2]]. Updates on
Expand Down Expand Up @@ -152,6 +154,25 @@ object SQLMetrics {
acc
}

def createV2CustomMetrics(
sc: SparkContext, customMetrics: Array[CustomMetric]): Map[String, SQLMetric] = {
customMetrics.map { customMetric =>
customMetric.name() -> SQLMetrics.createV2CustomMetric(sc, customMetric)
}.toMap
}

def postV2DriverMetrics(sc: SparkContext, driverMetrics: Array[CustomTaskMetric],
metrics: Map[String, SQLMetric]): Unit = {
val driveSQLMetrics = driverMetrics.map(customTaskMetric => {
val metric = metrics(customTaskMetric.name())
metric.set(customTaskMetric.value())
metric
})
val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sc, executionId,
driveSQLMetrics.toImmutableArraySeq)
}

/**
* Create a metric to report the size information (including total, min, med, max) like data size,
* spill size, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, V1Scan}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1Write, WriteBuilder}
import org.apache.spark.sql.execution.datasources.DataSourceUtils
Expand Down Expand Up @@ -198,6 +199,30 @@ class V1WriteFallbackSuite extends QueryTest with SharedSparkSession with Before
SparkSession.setDefaultSession(spark)
}
}

test("SPARK-49210: report driver metrics from fallback write") {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
try {
val session = SparkSession.builder()
.master("local[1]")
.config(V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[V1FallbackTableCatalog].getName)
.getOrCreate()
val df = session.createDataFrame(Seq((1, "p1"), (2, "p2"), (3, "p2")))
.toDF("id", "p_49210")
df.write.partitionBy("p_49210").mode("append").format(v2Format).saveAsTable("test")
val statusStore = session.sharedState.statusStore
val execId = statusStore.executionsList()
.find(x => x.metrics.exists(_.name.equals("number of written partitions")
&& x.physicalPlanDescription.contains("p_49210")))
.get.executionId
val metrics = statusStore.executionMetrics(execId)
assert(metrics.head._2 == "2")
} finally {
SparkSession.setActiveSession(spark)
SparkSession.setDefaultSession(spark)
}
}
}

class V1WriteFallbackSessionCatalogSuite
Expand Down Expand Up @@ -376,6 +401,8 @@ class InMemoryTableWithV1Fallback(
}

override def build(): V1Write = new V1Write {
var writtenPartitionsCount: Long = 0L

override def toInsertableRelation: InsertableRelation = {
(data: DataFrame, overwrite: Boolean) => {
assert(!overwrite, "V1 write fallbacks cannot be called with overwrite=true")
Expand All @@ -389,8 +416,17 @@ class InMemoryTableWithV1Fallback(
dataMap.put(partition, elements.toImmutableArraySeq)
}
}
writtenPartitionsCount = dataMap.size
}
}

override def supportedCustomMetrics(): Array[CustomMetric] = {
Array(new WrittenPartitionDriverMetric)
}

override def reportDriverMetrics(): Array[CustomTaskMetric] = {
Array(new WrittenPartitionDriverTaskMetric(writtenPartitionsCount))
}
}
}

Expand Down Expand Up @@ -452,3 +488,14 @@ object OnlyOnceOptimizerRule extends Rule[LogicalPlan] {
}
}
}

class WrittenPartitionDriverMetric extends CustomMetric {
override def name(): String = "number_of_written_partitions"
override def description(): String = "number of written partitions"
override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = taskMetrics.sum.toString
}

class WrittenPartitionDriverTaskMetric(value : Long) extends CustomTaskMetric {
override def name(): String = "number_of_written_partitions"
override def value(): Long = value
}
Loading