From 511b7b6bbf4e22092c024dd0deb58daa6bb23b4b Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Wed, 1 Aug 2018 17:24:05 -0700 Subject: [PATCH 1/2] clean up --- .../org/apache/spark/sql/catalyst/dsl/package.scala | 1 + .../sql/catalyst/expressions/aggregate/Average.scala | 10 ++++------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 89e8c998f740d..98708545c4bfc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -166,6 +166,7 @@ package object dsl { def maxDistinct(e: Expression): Expression = Max(e).toAggregateExpression(isDistinct = true) def upper(e: Expression): Expression = Upper(e) def lower(e: Expression): Expression = Lower(e) + def coalesce(args: Expression*): Expression = Coalesce(args) def sqrt(e: Expression): Expression = Sqrt(e) def abs(e: Expression): Expression = Abs(e) def star(names: String*): Expression = names match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala index 9ccf5aa092d11..f1fad770b637f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala @@ -46,7 +46,7 @@ abstract class AverageLike(child: Expression) extends DeclarativeAggregate { override lazy val aggBufferAttributes = sum :: count :: Nil override lazy val initialValues = Seq( - /* sum = */ Cast(Literal(0), sumDataType), + /* sum = */ Literal(0).cast(sumDataType), /* count = */ Literal(0L) ) @@ -58,18 +58,16 @@ abstract class AverageLike(child: Expression) extends DeclarativeAggregate { // If all input are nulls, count will be 0 and we will get null after the division. override lazy val evaluateExpression = child.dataType match { case _: DecimalType => - Cast( - DecimalPrecision.decimalAndDecimal(sum / Cast(count, DecimalType.LongDecimal)), - resultType) + DecimalPrecision.decimalAndDecimal(sum / count.cast(DecimalType.LongDecimal)).cast(resultType) case _ => - Cast(sum, resultType) / Cast(count, resultType) + sum.cast(resultType) / count.cast(resultType) } protected def updateExpressionsDef: Seq[Expression] = Seq( /* sum = */ Add( sum, - Coalesce(Cast(child, sumDataType) :: Cast(Literal(0), sumDataType) :: Nil)), + coalesce(child.cast(sumDataType), Literal(0).cast(sumDataType))), /* count = */ If(IsNull(child), count, count + 1L) ) From a5762d76cbb12d3da0fd4721cea90456bea2a3ef Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Wed, 1 Aug 2018 22:59:32 -0700 Subject: [PATCH 2/2] fix --- .../plans/logical/basicLogicalOperators.scala | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 13b51304d7f89..68413d7fd10f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -924,23 +924,3 @@ case class Deduplicate( override def output: Seq[Attribute] = child.output } - -/** - * A logical plan for setting a barrier of analysis. - * - * The SQL Analyzer goes through a whole query plan even most part of it is analyzed. This - * increases the time spent on query analysis for long pipelines in ML, especially. - * - * This logical plan wraps an analyzed logical plan to prevent it from analysis again. The barrier - * is applied to the analyzed logical plan in Dataset. It won't change the output of wrapped - * logical plan and just acts as a wrapper to hide it from analyzer. New operations on the dataset - * will be put on the barrier, so only the new nodes created will be analyzed. - * - * This analysis barrier will be removed at the end of analysis stage. - */ -case class AnalysisBarrier(child: LogicalPlan) extends LeafNode { - override protected def innerChildren: Seq[LogicalPlan] = Seq(child) - override def output: Seq[Attribute] = child.output - override def isStreaming: Boolean = child.isStreaming - override def doCanonicalize(): LogicalPlan = child.canonicalized -}