diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 56a3dd02f9ba..e8e020176ce4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -53,9 +53,21 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { */ protected def planLater(plan: LogicalPlan): PhysicalPlan = this.plan(plan).next() + /** + * This is used to establish a link between physical and logical plan nodes. The default + * implementation is a no-op. + */ + protected def attachLogicalPlan( + physicalPlans: Seq[PhysicalPlan], + logicalPlan: LogicalPlan): Seq[PhysicalPlan] = { + physicalPlans + } + def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... - val iter = strategies.view.flatMap(_(plan)).toIterator + val iter = strategies.view.flatMap { strategy => + attachLogicalPlan(strategy(plan), plan) + }.toIterator assert(iter.hasNext, s"No plan for $plan") iter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index b19b772409d8..c32c965919d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric} import org.apache.spark.sql.types.DataType @@ -47,6 +48,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext + @transient + private[this] var _logicalPlan: Option[LogicalPlan] = None + + def logicalPlan: Option[LogicalPlan] = _logicalPlan + // sqlContext will be null when we are being deserialized on the slaves. In this instance // the value of subexpressionEliminationEnabled will be set by the desserializer after the // constructor has run. @@ -61,10 +67,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ */ private val prepareCalled = new AtomicBoolean(false) + def withLogicalPlan(logicalPlanOpt: Option[LogicalPlan]): SparkPlan = { + _logicalPlan = logicalPlanOpt + this + } + + def withLogicalPlan(logicalPlan: LogicalPlan): SparkPlan = { + _logicalPlan = Some(logicalPlan) + this + } + /** Overridden make copy also propogates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { SQLContext.setActive(sqlContext) - super.makeCopy(newArgs) + super.makeCopy(newArgs).withLogicalPlan(_logicalPlan) + } + + override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = { + super.withNewChildren(newChildren).withLogicalPlan(_logicalPlan) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 6e9a4df82824..3bb582727e36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.SparkContext import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.DataSourceStrategy class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies { @@ -83,4 +84,11 @@ class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies { Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan)) } } + + override protected def attachLogicalPlan( + physicalPlans: Seq[SparkPlan], + logicalPlan: LogicalPlan): Seq[SparkPlan] = { + physicalPlans.map(_.withLogicalPlan(logicalPlan)) + } + }