Skip to content

Commit 6c442d1

Browse files
committed
backport to branch-2.2
1 parent 1c892c0 commit 6c442d1

File tree

2 files changed

+33
-8
lines changed

2 files changed

+33
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,19 +170,14 @@ case class FileSourceScanExec(
170170
false
171171
}
172172

173+
private var metadataTime = 0L
174+
173175
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
174176
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
175177
val startTime = System.nanoTime()
176178
val ret = relation.location.listFiles(partitionFilters, dataFilters)
177179
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
178-
179-
metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
180-
metrics("metadataTime").add(timeTakenMs)
181-
182-
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
183-
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
184-
metrics("numFiles") :: metrics("metadataTime") :: Nil)
185-
180+
metadataTime = timeTakenMs
186181
ret
187182
}
188183

@@ -281,6 +276,8 @@ case class FileSourceScanExec(
281276
}
282277

283278
private lazy val inputRDD: RDD[InternalRow] = {
279+
// Update metrics for taking effect in both code generation node and normal node.
280+
updateDriverMetrics()
284281
val readFile: (PartitionedFile) => Iterator[InternalRow] =
285282
relation.fileFormat.buildReaderWithPartitionValues(
286283
sparkSession = relation.sparkSession,
@@ -514,6 +511,19 @@ case class FileSourceScanExec(
514511
}
515512
}
516513

514+
/**
515+
* Send the updated metrics to driver, while this function calling, selectedPartitions has
516+
* been initialized. See SPARK-26327 for more detail.
517+
*/
518+
private def updateDriverMetrics() = {
519+
metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
520+
metrics("metadataTime").add(metadataTime)
521+
522+
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
523+
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
524+
metrics("numFiles") :: metrics("metadataTime") :: Nil)
525+
}
526+
517527
override lazy val canonicalized: FileSourceScanExec = {
518528
FileSourceScanExec(
519529
relation,

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,4 +443,19 @@ object InputOutputMetricsHelper {
443443
}
444444
listener.getResults()
445445
}
446+
447+
test("SPARK-26327: FileSourceScanExec metrics") {
448+
withTable("testDataForScan") {
449+
spark.range(10).selectExpr("id", "id % 3 as p")
450+
.write.partitionBy("p").saveAsTable("testDataForScan")
451+
// The execution plan only has 1 FileScan node.
452+
val df = spark.sql(
453+
"SELECT * FROM testDataForScan WHERE p = 1")
454+
testSparkPlanMetrics(df, 1, Map(
455+
0L -> (("Scan parquet default.testdataforscan", Map(
456+
"number of output rows" -> 3L,
457+
"number of files" -> 2L))))
458+
)
459+
}
460+
}
446461
}

0 commit comments

Comments
 (0)