Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ class FileScanRDD(
inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
}

// If we can't get the bytes read from the FS stats, fall back to the file size,
// which may be inaccurate.
private def updateBytesReadWithFileSize(): Unit = {
if (currentFile != null) {
inputMetrics.incBytesRead(currentFile.length)
}
}

private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
private[this] var currentFile: PartitionedFile = null
private[this] var currentIterator: Iterator[Object] = null
Expand Down Expand Up @@ -139,7 +131,6 @@ class FileScanRDD(

/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
updateBytesReadWithFileSize()
if (files.hasNext) {
currentFile = files.next()
logInfo(s"Reading File $currentFile")
Expand Down Expand Up @@ -208,7 +199,6 @@ class FileScanRDD(

override def close(): Unit = {
updateBytesRead()
updateBytesReadWithFileSize()
InputFileBlockHolder.unset()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package org.apache.spark.sql
import java.io.{File, FileNotFoundException}
import java.util.Locale

import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -473,6 +476,27 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}
}

test("SPARK-25237 compute correct input metrics in FileScanRDD") {
withTempPath { p =>
val path = p.getAbsolutePath
spark.range(1000).repartition(1).write.csv(path)
val bytesReads = new mutable.ArrayBuffer[Long]()
val bytesReadListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
}
}
sparkContext.addSparkListener(bytesReadListener)
try {
spark.read.csv(path).limit(1).collect()
sparkContext.listenerBus.waitUntilEmpty(1000L)
assert(bytesReads.sum === 7860)
Copy link
Member

@srowen srowen Sep 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the sum should be 10*2 + 90*3 + 900*4 = 3890. That's the size of the CSV file that's written too, when I try it locally. When I run this code without the change here, I get 7820+7820 = 15640. So this is better! but I wonder why it ends up thinking it reads about twice the bytes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test, Spark run with local[2] and each scan thread points to the same CSV file. Since each thread gets the file size thru Hadoop APIs, the total byteRead becomes 2 * the file size, IIUC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7860/2=3930, 40 bytes more than expected, but I'm willing to believe there's a good reason for that somewhere in how it gets read. Clearly it's much better than the answer of 15640, so willing to believe this is fixing something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, actually the file size is 3890, but the hadoop API (FileSystem.getAllStatistics ) reports that number (3930`). I didn't look into the Hadoop code yet, so I don't get why. I'll dig into it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test, Spark run with local[2] and each scan thread points to the same CSV file. Since each thread gets the file size thru Hadoop APIs, the total byteRead becomes 2 * the file size, IIUC.

I am afraid it's not that case, csv will infer schema first, which will try to load the the first row in the path, then the actually read. That's why the input bytes read is doubled. It may be more reasonable to just write and read text file.

As for 3930 = 3890 + 40, the extra 40 bytes is the crc file size. Hadoop uses ChecksumFileSystem internally.

And one more thing: this test case may be inaccurate. If the task completes successfully, all the data is consumed, updateBytesReadWithFileSize is a no-op, and updateBytesRead() in the close function will update the correct size.
FYI @maropu

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Can you make a pr to fix that?

} finally {
sparkContext.removeSparkListener(bytesReadListener)
}
}
}
}

object TestingUDT {
Expand Down