Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,15 @@ class FileScanRDD(
val nextElement = currentIterator.next()
// TODO: we should have a better separation of row based and batch based scan, so that we
// don't need to run this `if` for every record.
val preNumRecordsRead = inputMetrics.recordsRead
if (nextElement.isInstanceOf[ColumnarBatch]) {
inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
} else {
inputMetrics.incRecordsRead(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original goal here is to avoid updating it every record, because it is too expensive. I am not sure what is the goal of your changes. Try to write a test case in SQLMetricsSuite?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue is that in line 108, this value can be incremented by more than 1. It might skip over the count that is an exact multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS. If that code path is common, it might rarely ever get updated. This now just checks whether the increment causes the value to exceed a higher multiple of UPDATE_INPUT_METRICS_INTERVAL_RECORDS, which sounds more correct. But yeah needs a description and ideally a little test.

// The records may be incremented by more than 1 at a time.
if (preNumRecordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS !=
inputMetrics.recordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) {
updateBytesRead()
}
nextElement
Expand Down