diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 6fb41b6425c4..1f6a7c089bf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -170,19 +170,14 @@ case class FileSourceScanExec( false } + private var metadataTime = 0L + @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 - - metrics("numFiles").add(ret.map(_.files.size.toLong).sum) - metrics("metadataTime").add(timeTakenMs) - - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - metrics("numFiles") :: metrics("metadataTime") :: Nil) - + metadataTime = timeTakenMs ret } @@ -281,6 +276,8 @@ case class FileSourceScanExec( } private lazy val inputRDD: RDD[InternalRow] = { + // Update metrics for taking effect in both code generation node and normal node. + updateDriverMetrics() val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -514,6 +511,19 @@ case class FileSourceScanExec( } } + /** + * Send the updated metrics to driver, while this function calling, selectedPartitions has + * been initialized. See SPARK-26327 for more detail. + */ + private def updateDriverMetrics() = { + metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum) + metrics("metadataTime").add(metadataTime) + + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, + metrics("numFiles") :: metrics("metadataTime") :: Nil) + } + override lazy val canonicalized: FileSourceScanExec = { FileSourceScanExec( relation, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 79d1fbfa3f07..26b822fe6208 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -372,6 +372,21 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil) } } + + test("SPARK-26327: FileSourceScanExec metrics") { + withTable("testDataForScan") { + spark.range(10).selectExpr("id", "id % 3 as p") + .write.partitionBy("p").saveAsTable("testDataForScan") + // The execution plan only has 1 FileScan node. + val df = spark.sql( + "SELECT * FROM testDataForScan WHERE p = 1") + testSparkPlanMetrics(df, 1, Map( + 0L -> (("Scan parquet default.testdataforscan", Map( + "number of output rows" -> 3L, + "number of files" -> 2L)))) + ) + } + } } object InputOutputMetricsHelper {