Skip to content

Commit 0726bc5

Browse files
gatorsmilesrowen
authored andcommitted
[SPARK-25674][FOLLOW-UP] Update the stats for each ColumnarBatch
This PR is a follow-up of #22594 . This alternative can avoid the unneeded computation in the hot code path. - For row-based scan, we keep the original way. - For the columnar scan, we just need to update the stats after each batch. N/A Closes #22731 from gatorsmile/udpateStatsFileScanRDD. Authored-by: gatorsmile <gatorsmile@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 4cee191) Signed-off-by: Sean Owen <sean.owen@databricks.com>
1 parent d87896b commit 0726bc5

File tree

1 file changed

+8
-7
lines changed

1 file changed

+8
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class FileScanRDD(
8585
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
8686
// task and in the same thread, in which case we need to avoid override values written by
8787
// previous partitions (SPARK-13071).
88-
private def updateBytesRead(): Unit = {
88+
private def incTaskInputMetricsBytesRead(): Unit = {
8989
inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
9090
}
9191

@@ -114,15 +114,16 @@ class FileScanRDD(
114114
// don't need to run this `if` for every record.
115115
val preNumRecordsRead = inputMetrics.recordsRead
116116
if (nextElement.isInstanceOf[ColumnarBatch]) {
117+
incTaskInputMetricsBytesRead()
117118
inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
118119
} else {
120+
// too costly to update every record
121+
if (inputMetrics.recordsRead %
122+
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
123+
incTaskInputMetricsBytesRead()
124+
}
119125
inputMetrics.incRecordsRead(1)
120126
}
121-
// The records may be incremented by more than 1 at a time.
122-
if (preNumRecordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS !=
123-
inputMetrics.recordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) {
124-
updateBytesRead()
125-
}
126127
nextElement
127128
}
128129

@@ -210,7 +211,7 @@ class FileScanRDD(
210211
}
211212

212213
override def close(): Unit = {
213-
updateBytesRead()
214+
incTaskInputMetricsBytesRead()
214215
updateBytesReadWithFileSize()
215216
InputFileBlockHolder.unset()
216217
}

0 commit comments

Comments
 (0)