Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {

if (rel.hasCommon && rel.getCommon.hasPlanId) {
plan.setTagValue(LogicalPlan.PLAN_ID_TAG, rel.getCommon.getPlanId)
// scalastyle:off println

println()
println("Planner get a plan:")
println(s"$plan")
println()
}
plan
}
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ def schema(self, plan: pb2.Plan) -> StructType:
Return schema for given plan.
"""
logger.info(f"Schema for plan: {self._proto_to_string(plan)}")
print(f"Schema for plan: {self._proto_to_string(plan)}")
schema = self._analyze(method="schema", plan=plan).schema
assert schema is not None
# Server side should populate the struct field which is the schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3151,9 +3151,16 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
// Add Window operators.
val withWindow = addWindow(windowExpressions, withProject)

val planId = p.getTagValue(LogicalPlan.PLAN_ID_TAG)

// Finally, generate output columns according to the original projectList.
val finalProjectList = projectList.map(_.toAttribute)
Project(finalProjectList, withWindow)
val newProject = Project(finalProjectList, withWindow)

// retain the plan id used in Spark Connect
planId.foreach(newProject.setTagValue(LogicalPlan.PLAN_ID_TAG, _))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to fix example 2.


newProject
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ abstract class TypeCoercionBase {
object WidenSetOperationTypes extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperatorsUpWithNewOutput {
val planId = plan.getTagValue(LogicalPlan.PLAN_ID_TAG)

val newPlan = plan resolveOperatorsUpWithNewOutput {
case s @ Except(left, right, isAll) if s.childrenResolved &&
left.output.length == right.output.length && !s.resolved =>
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil)
Expand Down Expand Up @@ -290,6 +292,11 @@ abstract class TypeCoercionBase {
s.copy(children = newChildren) -> attrMapping
}
}

// retain the plan id used in Spark Connect
planId.foreach(newPlan.setTagValue(LogicalPlan.PLAN_ID_TAG, _))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to fix example 1


newPlan
}

/** Build new children with the widest types for each attribute among all the children */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.{BinaryLike, CurrentOrigin, LeafLike, QuaternaryLike, SQLQueryContext, TernaryLike, TreeNode, UnaryLike}
import org.apache.spark.sql.catalyst.trees.TreePattern.{RUNTIME_REPLACEABLE, TreePattern}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
Expand Down Expand Up @@ -343,8 +344,14 @@ abstract class Expression extends TreeNode[Expression] {

override def simpleString(maxFields: Int): String = toString

override def toString: String = prettyName + truncatedString(
flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields)
override def toString: String = {
val str = prettyName + truncatedString(
flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields)
this.getTagValue(LogicalPlan.PLAN_ID_TAG) match {
case Some(planId) => s"$str {planId=$planId}"
case _ => str
}
}

/**
* Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ abstract class LogicalPlan
}
}

override def simpleString(maxFields: Int): String = {
val str = super.simpleString(maxFields)
this.getTagValue(LogicalPlan.PLAN_ID_TAG) match {
case Some(planId) => s"$str {planId=$planId}"
case _ => str
}
}


private[this] lazy val childAttributes = AttributeSeq.fromNormalOutput(children.flatMap(_.output))

private[this] lazy val childMetadataAttributes = AttributeSeq(children.flatMap(_.metadataOutput))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ object RuleExecutor {
}
}

// scalastyle:off println
class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {

private val logLevel = SQLConf.get.planChangeLogLevel
Expand All @@ -63,6 +64,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
""".stripMargin
}

println(message)
logBasedOnLevel(message)
}
}
Expand All @@ -81,6 +83,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
}
}

println(message)
logBasedOnLevel(message)
}
}
Expand All @@ -97,6 +100,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
|Total time of effective runs: $totalTimeEffective seconds
""".stripMargin

println(message)
logBasedOnLevel(message)
}

Expand Down