diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index f82cb6760404..d365ae55904a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -174,6 +174,12 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { if (rel.hasCommon && rel.getCommon.hasPlanId) { plan.setTagValue(LogicalPlan.PLAN_ID_TAG, rel.getCommon.getPlanId) + // scalastyle:off println + + println() + println("Planner get a plan:") + println(s"$plan") + println() } plan } diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 00f2a85d602c..82185ed96997 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -823,6 +823,7 @@ def schema(self, plan: pb2.Plan) -> StructType: Return schema for given plan. """ logger.info(f"Schema for plan: {self._proto_to_string(plan)}") + print(f"Schema for plan: {self._proto_to_string(plan)}") schema = self._analyze(method="schema", plan=plan).schema assert schema is not None # Server side should populate the struct field which is the schema. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 55433ea04b8f..ef3108485f02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3151,9 +3151,16 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // Add Window operators. val withWindow = addWindow(windowExpressions, withProject) + val planId = p.getTagValue(LogicalPlan.PLAN_ID_TAG) + // Finally, generate output columns according to the original projectList. val finalProjectList = projectList.map(_.toAttribute) - Project(finalProjectList, withWindow) + val newProject = Project(finalProjectList, withWindow) + + // retain the plan id used in Spark Connect + planId.foreach(newProject.setTagValue(LogicalPlan.PLAN_ID_TAG, _)) + + newProject } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 2a1067be004e..1387124e96ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -256,7 +256,9 @@ abstract class TypeCoercionBase { object WidenSetOperationTypes extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { - plan resolveOperatorsUpWithNewOutput { + val planId = plan.getTagValue(LogicalPlan.PLAN_ID_TAG) + + val newPlan = plan resolveOperatorsUpWithNewOutput { case s @ Except(left, right, isAll) if s.childrenResolved && left.output.length == right.output.length && !s.resolved => val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil) @@ -290,6 +292,11 @@ abstract class TypeCoercionBase { s.copy(children = newChildren) -> attrMapping } } + + // retain the plan id used in Spark Connect + planId.foreach(newPlan.setTagValue(LogicalPlan.PLAN_ID_TAG, _)) + + newPlan } /** Build new children with the widest types for each attribute among all the children */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index c2330cdb59db..6ebf0eb6eaea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.{BinaryLike, CurrentOrigin, LeafLike, QuaternaryLike, SQLQueryContext, TernaryLike, TreeNode, UnaryLike} import org.apache.spark.sql.catalyst.trees.TreePattern.{RUNTIME_REPLACEABLE, TreePattern} import org.apache.spark.sql.catalyst.types.DataTypeUtils @@ -343,8 +344,14 @@ abstract class Expression extends TreeNode[Expression] { override def simpleString(maxFields: Int): String = toString - override def toString: String = prettyName + truncatedString( - flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields) + override def toString: String = { + val str = prettyName + truncatedString( + flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields) + this.getTagValue(LogicalPlan.PLAN_ID_TAG) match { + case Some(planId) => s"$str {planId=$planId}" + case _ => str + } + } /** * Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 374eb070db1c..dbeeebe8fa17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -125,6 +125,15 @@ abstract class LogicalPlan } } + override def simpleString(maxFields: Int): String = { + val str = super.simpleString(maxFields) + this.getTagValue(LogicalPlan.PLAN_ID_TAG) match { + case Some(planId) => s"$str {planId=$planId}" + case _ => str + } + } + + private[this] lazy val childAttributes = AttributeSeq.fromNormalOutput(children.flatMap(_.output)) private[this] lazy val childMetadataAttributes = AttributeSeq(children.flatMap(_.metadataOutput)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 9d29ca1f9c6e..732d6781e6ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -45,6 +45,7 @@ object RuleExecutor { } } +// scalastyle:off println class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { private val logLevel = SQLConf.get.planChangeLogLevel @@ -63,6 +64,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { """.stripMargin } + println(message) logBasedOnLevel(message) } } @@ -81,6 +83,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { } } + println(message) logBasedOnLevel(message) } } @@ -97,6 +100,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { |Total time of effective runs: $totalTimeEffective seconds """.stripMargin + println(message) logBasedOnLevel(message) }