-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13671] [SPARK-13311] [SQL] Use different physical plans for RDD and data sources #11514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0e78b3a
d2d2062
c8db837
0278fd9
618e555
6cfa545
c4ea2e8
c159b25
b482d2c
5975560
b3d2df0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -101,17 +101,76 @@ private[sql] case class LogicalRDD( | |
| private[sql] case class PhysicalRDD( | ||
| output: Seq[Attribute], | ||
| rdd: RDD[InternalRow], | ||
| override val nodeName: String, | ||
| override val metadata: Map[String, String] = Map.empty, | ||
| isUnsafeRow: Boolean = false, | ||
| override val outputPartitioning: Partitioning = UnknownPartitioning(0)) | ||
| override val nodeName: String) extends LeafNode { | ||
|
|
||
| private[sql] override lazy val metrics = Map( | ||
| "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = { | ||
| val numOutputRows = longMetric("numOutputRows") | ||
| rdd.mapPartitionsInternal { iter => | ||
| val proj = UnsafeProjection.create(schema) | ||
| iter.map { r => | ||
| numOutputRows += 1 | ||
| proj(r) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| override def simpleString: String = { | ||
| s"Scan $nodeName${output.mkString("[", ",", "]")}" | ||
| } | ||
| } | ||
|
|
||
| /** Physical plan node for scanning data from a relation. */ | ||
| private[sql] case class DataSourceScan( | ||
| output: Seq[Attribute], | ||
| rdd: RDD[InternalRow], | ||
| @transient relation: BaseRelation, | ||
| override val metadata: Map[String, String] = Map.empty) | ||
| extends LeafNode with CodegenSupport { | ||
|
|
||
| override val nodeName: String = relation.toString | ||
|
|
||
| // Ignore rdd when checking results | ||
| override def sameResult(plan: SparkPlan ): Boolean = plan match { | ||
| case other: DataSourceScan => relation == other.relation && metadata == other.metadata | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is actually wrong because we cannot ignore the rdd, otherwise scans of different partitions are treated as "sameResult"! |
||
| case _ => false | ||
| } | ||
|
|
||
| private[sql] override lazy val metrics = Map( | ||
| "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) | ||
|
|
||
| val outputUnsafeRows = relation match { | ||
| case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => | ||
| !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) | ||
| case _: HadoopFsRelation => true | ||
| case _ => false | ||
| } | ||
|
|
||
| override val outputPartitioning = { | ||
| val bucketSpec = relation match { | ||
| // TODO: this should be closer to bucket planning. | ||
| case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec | ||
| case _ => None | ||
| } | ||
|
|
||
| def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { | ||
| throw new AnalysisException(s"bucket column $colName not found in existing columns " + | ||
| s"(${output.map(_.name).mkString(", ")})") | ||
| } | ||
|
|
||
| bucketSpec.map { spec => | ||
| val numBuckets = spec.numBuckets | ||
| val bucketColumns = spec.bucketColumnNames.map(toAttribute) | ||
| HashPartitioning(bucketColumns, numBuckets) | ||
| }.getOrElse { | ||
| UnknownPartitioning(0) | ||
| } | ||
| } | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = { | ||
| val unsafeRow = if (isUnsafeRow) { | ||
| val unsafeRow = if (outputUnsafeRows) { | ||
| rdd | ||
| } else { | ||
| rdd.mapPartitionsInternal { iter => | ||
|
|
@@ -187,7 +246,7 @@ private[sql] case class PhysicalRDD( | |
| ctx.INPUT_ROW = row | ||
| ctx.currentVars = null | ||
| val columns2 = exprs.map(_.gen(ctx)) | ||
| val inputRow = if (isUnsafeRow) row else null | ||
| val inputRow = if (outputUnsafeRows) row else null | ||
| val scanRows = ctx.freshName("processRows") | ||
| ctx.addNewFunction(scanRows, | ||
| s""" | ||
|
|
@@ -221,42 +280,8 @@ private[sql] case class PhysicalRDD( | |
| } | ||
| } | ||
|
|
||
| private[sql] object PhysicalRDD { | ||
| private[sql] object DataSourceScan { | ||
| // Metadata keys | ||
| val INPUT_PATHS = "InputPaths" | ||
| val PUSHED_FILTERS = "PushedFilters" | ||
|
|
||
| def createFromDataSource( | ||
| output: Seq[Attribute], | ||
| rdd: RDD[InternalRow], | ||
| relation: BaseRelation, | ||
| metadata: Map[String, String] = Map.empty): PhysicalRDD = { | ||
|
|
||
| val outputUnsafeRows = relation match { | ||
| case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => | ||
| !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) | ||
| case _: HadoopFsRelation => true | ||
| case _ => false | ||
| } | ||
|
|
||
| val bucketSpec = relation match { | ||
| // TODO: this should be closer to bucket planning. | ||
| case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec | ||
| case _ => None | ||
| } | ||
|
|
||
| def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { | ||
| throw new AnalysisException(s"bucket column $colName not found in existing columns " + | ||
| s"(${output.map(_.name).mkString(", ")})") | ||
| } | ||
|
|
||
| bucketSpec.map { spec => | ||
| val numBuckets = spec.numBuckets | ||
| val bucketColumns = spec.bucketColumnNames.map(toAttribute) | ||
| val partitioning = HashPartitioning(bucketColumns, numBuckets) | ||
| PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows, partitioning) | ||
| }.getOrElse { | ||
| PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows) | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,6 +93,10 @@ private[sql] object SparkPlanGraph { | |
| case "Subquery" if subgraph != null => | ||
| // Subquery should not be included in WholeStageCodegen | ||
| buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges) | ||
| case "ReusedExchange" => | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one is lost when fix conflicts in last PR (#11403). |
||
| // Point to the re-used exchange | ||
| val node = exchanges(planInfo.children.head) | ||
| edges += SparkPlanGraphEdge(node.id, parent.id) | ||
| case name => | ||
| val metrics = planInfo.metrics.map { metric => | ||
| SQLPlanMetric(metric.name, metric.accumulatorId, | ||
|
|
@@ -106,7 +110,7 @@ private[sql] object SparkPlanGraph { | |
| } else { | ||
| subgraph.nodes += node | ||
| } | ||
| if (name == "ShuffleExchange" || name == "BroadcastExchange") { | ||
| if (name.contains("Exchange")) { | ||
| exchanges += planInfo -> node | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we override
outputPartitioningand set it toUnknownPartitioning(rdd.partitions.length)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a partitioning is UnknownPartitioning, the number is meaningless, I think.