Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class Analyzer(
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
DistinctAggregationRewriter(conf) ::
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Complete}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPlan}
Expand Down Expand Up @@ -100,13 +99,10 @@ import org.apache.spark.sql.types.IntegerType
* we could improve this in the current rule by applying more advanced expression cannocalization
* techniques.
*/
case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalPlan] {
object DistinctAggregationRewriter extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case p if !p.resolved => p
// We need to wait until this Aggregate operator is resolved.
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case a: Aggregate => rewrite(a)
case p => p
}

def rewrite(a: Aggregate): Aggregate = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import scala.collection.immutable.HashSet

import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubqueryAliases}
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
Expand All @@ -42,7 +42,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
ComputeCurrentTime) ::
ComputeCurrentTime,
DistinctAggregationRewriter) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
""".stripMargin)
}


test("intersect") {
checkHiveQl("SELECT * FROM t0 INTERSECT SELECT * FROM t0")
}
Expand Down Expand Up @@ -367,9 +366,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkHiveQl("SELECT * FROM parquet_t0 TABLESAMPLE(0.1 PERCENT) WHERE 1=0")
}

// TODO Enable this
// Query plans transformed by DistinctAggregationRewriter are not recognized yet
ignore("multi-distinct columns") {
test("multi-distinct columns") {
checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM parquet_t2 GROUP BY a")
}

Expand Down