From f7fe143214ed3ad27470eec8aaa18bbe4e40e7cd Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 May 2015 23:02:32 -0700 Subject: [PATCH 1/4] Avoid unnecessary closure cleaning --- .../apache/spark/sql/sources/DataSourceStrategy.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index 1615a6dcbdb2..6675ceb13c2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.sources -import org.apache.spark.Logging -import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.{Logging, TaskContext} +import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -184,7 +184,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { } } - dataRows.mapPartitions { iterator => + // Since we know for sure that this closure is serializable, we can avoid the overhead + // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally + // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718). + val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[Row]) => { val dataTypes = requiredColumns.map(schema(_).dataType) val mutableRow = new SpecificMutableRow(dataTypes) iterator.map { dataRow => @@ -196,6 +199,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { mutableRow.asInstanceOf[expressions.Row] } } + new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false) + } else { dataRows } From 523f042faf68f39c838cc51dbb7e095fc09dd6c9 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 19 May 2015 12:14:45 -0700 Subject: [PATCH 2/4] Skip unnecessary Utils.getCallSites too --- .../scala/org/apache/spark/util/Utils.scala | 18 ++++++++++++++++++ .../spark/sql/sources/DataSourceStrategy.scala | 9 ++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6a7d1fae3320..b7a2473dfe92 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2201,6 +2201,24 @@ private[spark] object Utils extends Logging { shutdownHooks.remove(ref) } + /** + * To avoid calling `Utils.getCallSite` for every single RDD we create in the body, + * set a dummy call site that RDDs use instead. This is for performance optimization. + */ + def withDummyCallSite[T](sc: SparkContext)(body: => T): T = { + val oldShortCallSite = sc.getLocalProperty(CallSite.SHORT_FORM) + val oldLongCallSite = sc.getLocalProperty(CallSite.LONG_FORM) + try { + sc.setLocalProperty(CallSite.SHORT_FORM, "") + sc.setLocalProperty(CallSite.LONG_FORM, "") + body + } finally { + // Restore the old ones here + sc.setLocalProperty(CallSite.SHORT_FORM, oldShortCallSite) + sc.setLocalProperty(CallSite.LONG_FORM, oldLongCallSite) + } + } + } private [util] class SparkShutdownHookManager { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index 6675ceb13c2b..309ffd72ee24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.{StringType, StructType, UTF8String} import org.apache.spark.sql.{SaveMode, Strategy, execution, sources} +import org.apache.spark.util.Utils /** * A Strategy for planning scans over data sources defined using the sources API. @@ -199,7 +200,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { mutableRow.asInstanceOf[expressions.Row] } } - new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false) + + // This is an internal RDD whose call site the user should not be concerned with + // Since we create many of these (one per partition), the time spent on computing + // the call site may add up. + Utils.withDummyCallSite(dataRows.sparkContext) { + new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false) + } } else { dataRows From 10f7e3e22ae79a7b5f7dc0d7fd1b22867ca1d0d4 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 20 May 2015 11:40:21 -0700 Subject: [PATCH 3/4] Avoid getting call sites and cleaning closures --- .../apache/spark/sql/parquet/newParquet.scala | 98 ++++++++++--------- .../spark/sql/sources/SqlNewHadoopRDD.scala | 4 - 2 files changed, 50 insertions(+), 52 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index c35b7eff82af..9c2310f290fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -33,6 +33,7 @@ import parquet.hadoop._ import parquet.hadoop.metadata.CompressionCodecName import parquet.hadoop.util.ContextUtil +import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD._ @@ -40,7 +41,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.{Row, SQLConf, SQLContext} -import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException} +import org.apache.spark.util.Utils private[sql] class DefaultSource extends HadoopFsRelationProvider { override def createRelation( @@ -252,57 +253,58 @@ private[sql] class ParquetRelation2( val footers = inputFiles.map(f => metadataCache.footers(f.getPath)) - // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. - // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and - // footers. Especially when a global arbitrative schema (either from metastore or data source - // DDL) is available. - new SqlNewHadoopRDD( - sc = sqlContext.sparkContext, - broadcastedConf = broadcastedConf, - initDriverSideJobFuncOpt = Some(setInputPaths), - initLocalJobFuncOpt = Some(initLocalJobFuncOpt), - inputFormatClass = classOf[FilteringParquetRowInputFormat], - keyClass = classOf[Void], - valueClass = classOf[Row]) { - - val cacheMetadata = useMetadataCache - - @transient val cachedStatuses = inputFiles.map { f => - // In order to encode the authority of a Path containing special characters such as /, - // we need to use the string returned by the URI of the path to create a new Path. - val pathWithAuthority = new Path(f.getPath.toUri.toString) - - new FileStatus( - f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, - f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority) - }.toSeq - - @transient val cachedFooters = footers.map { f => - // In order to encode the authority of a Path containing special characters such as /, - // we need to use the string returned by the URI of the path to create a new Path. - new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata) - }.toSeq - - // Overridden so we can inject our own cached files statuses. - override def getPartitions: Array[SparkPartition] = { - val inputFormat = if (cacheMetadata) { - new FilteringParquetRowInputFormat { - override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses - - override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters + Utils.withDummyCallSite(sqlContext.sparkContext) { + // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. + // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and + // footers. Especially when a global arbitrative schema (either from metastore or data source + // DDL) is available. + new SqlNewHadoopRDD( + sc = sqlContext.sparkContext, + broadcastedConf = broadcastedConf, + initDriverSideJobFuncOpt = Some(setInputPaths), + initLocalJobFuncOpt = Some(initLocalJobFuncOpt), + inputFormatClass = classOf[FilteringParquetRowInputFormat], + keyClass = classOf[Void], + valueClass = classOf[Row]) { + + val cacheMetadata = useMetadataCache + + @transient val cachedStatuses = inputFiles.map { f => + // In order to encode the authority of a Path containing special characters such as /, + // we need to use the string returned by the URI of the path to create a new Path. + val pathWithAuthority = new Path(f.getPath.toUri.toString) + + new FileStatus( + f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, + f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority) + }.toSeq + + @transient val cachedFooters = footers.map { f => + // In order to encode the authority of a Path containing special characters such as /, + // we need to use the string returned by the URI of the path to create a new Path. + new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata) + }.toSeq + + // Overridden so we can inject our own cached files statuses. + override def getPartitions: Array[SparkPartition] = { + val inputFormat = if (cacheMetadata) { + new FilteringParquetRowInputFormat { + override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses + override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters + } + } else { + new FilteringParquetRowInputFormat } - } else { - new FilteringParquetRowInputFormat - } - val jobContext = newJobContext(getConf(isDriverSide = true), jobId) - val rawSplits = inputFormat.getSplits(jobContext) + val jobContext = newJobContext(getConf(isDriverSide = true), jobId) + val rawSplits = inputFormat.getSplits(jobContext) - Array.tabulate[SparkPartition](rawSplits.size) { i => - new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + Array.tabulate[SparkPartition](rawSplits.size) { i => + new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + } } - } - }.values + }.values + } } private class MetadataCache { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala index 0c7bb6e50cd9..a74a98631da3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala @@ -75,10 +75,6 @@ private[sql] class SqlNewHadoopRDD[K, V]( with SparkHadoopMapReduceUtil with Logging { - if (initLocalJobFuncOpt.isDefined) { - sc.clean(initLocalJobFuncOpt.get) - } - protected def getJob(): Job = { val conf: Configuration = broadcastedConf.value.value // "new Job" will make a copy of the conf. Then, it is From a82b4513f5464fc3066ccba075975a3a58559209 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 20 May 2015 11:51:16 -0700 Subject: [PATCH 4/4] Fix style --- .../scala/org/apache/spark/sql/parquet/newParquet.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 9c2310f290fc..3c9807ff6f99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -255,9 +255,9 @@ private[sql] class ParquetRelation2( Utils.withDummyCallSite(sqlContext.sparkContext) { // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. - // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and - // footers. Especially when a global arbitrative schema (either from metastore or data source - // DDL) is available. + // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects + // and footers. Especially when a global arbitrative schema (either from metastore or data + // source DDL) is available. new SqlNewHadoopRDD( sc = sqlContext.sparkContext, broadcastedConf = broadcastedConf,