diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 988c785dbe61..e06fcd64e57e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -17,10 +17,14 @@ package org.apache.spark.sql.execution.datasources +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration.Duration + import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.util.ThreadUtils /** * A single file that should be read, along with partition column values that @@ -46,37 +50,80 @@ case class PartitionedFile( */ case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends Partition +object FileScanRDD { + private val ioExecutionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("FileScanRDD", 16)) +} + class FileScanRDD( @transient val sqlContext: SQLContext, readFunction: (PartitionedFile) => Iterator[InternalRow], @transient val filePartitions: Seq[FilePartition]) extends RDD[InternalRow](sqlContext.sparkContext, Nil) { + /** + * To get better interleaving of CPU and IO, this RDD will create a future to prepare the next + * file while the current one is being processed. `currentIterator` is the current file and + * `nextFile` is the future that will initialize the next file to be read. This includes things + * such as starting up connections to open the file and any initial buffering. The expectation + * is that `currentIterator` is CPU intensive and `nextFile` is IO intensive. + */ + val asyncIO = sqlContext.conf.filesAsyncIO + + case class NextFile(file: PartitionedFile, iter: Iterator[Object]) + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val iterator = new Iterator[Object] with AutoCloseable { private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + // TODO: do we need to close this? private[this] var currentIterator: Iterator[Object] = null + private[this] var nextFile: Future[NextFile] = if (asyncIO) prepareNextFile() else null + def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator() def next() = currentIterator.next() /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator(): Boolean = { - if (files.hasNext) { - val nextFile = files.next() - logInfo(s"Reading File $nextFile") - SqlNewHadoopRDDState.setInputFileName(nextFile.filePath) - currentIterator = readFunction(nextFile) - hasNext + val file = if (asyncIO) { + if (nextFile == null) return false + // Wait for the async task to complete + Await.result(nextFile, Duration.Inf) } else { - SqlNewHadoopRDDState.unsetInputFileName() - false + if (!files.hasNext) return false + val f = files.next() + NextFile(f, readFunction(f)) + } + + // This is only used to evaluate the rest of the execution so we can safely set it here. + SqlNewHadoopRDDState.setInputFileName(file.file.filePath) + currentIterator = file.iter + + if (asyncIO) { + // Asynchronously start the next file. + nextFile = prepareNextFile() } + + hasNext } override def close() = { SqlNewHadoopRDDState.unsetInputFileName() } + + def prepareNextFile() = { + if (files.hasNext) { + Future { + val file = files.next() + val it = readFunction(file) + // Read something from the file to trigger some initial IO. + it.hasNext + NextFile(file, it) + }(FileScanRDD.ioExecutionContext) + } else { + null + } + } } // Register an on-task-completion callback to close the input stream. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dc6ba1bcfb6d..fcc251a98d92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -28,7 +28,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the configuration options for Spark SQL. @@ -417,6 +416,12 @@ object SQLConf { .longConf .createWithDefault(4 * 1024 * 1024) + val FILES_ASYNC_IO = SQLConfigBuilder("spark.sql.files.asyncIO") + .internal() + .doc("If true, attempts to asynchronously do IO when reading data.") + .booleanConf + .createWithDefault(true) + val EXCHANGE_REUSE_ENABLED = SQLConfigBuilder("spark.sql.exchange.reuse") .internal() .doc("When true, the planner will try to find out duplicated exchanges and re-use them.") @@ -479,6 +484,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES) + def filesAsyncIO: Boolean = getConf(FILES_ASYNC_IO) + def useCompression: Boolean = getConf(COMPRESS_CACHED) def useFileScan: Boolean = getConf(USE_FILE_SCAN) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 581095d3dc1c..a8d898bdf426 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -78,10 +78,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("basic data types (without binary)") { - val data = (1 to 4).map { i => - (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) + (true :: false :: Nil).foreach { v => + withSQLConf(SQLConf.FILES_ASYNC_IO.key -> v.toString) { + val data = (1 to 4).map { i => + (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) + } + checkParquetFile(data) + } } - checkParquetFile(data) } test("raw binary") {