Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ class Analyzer(

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
i.copy(table = lookupTableFromCatalog(u).canonicalized)
case u: UnresolvedRelation => resolveRelation(u)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ package object dsl {
RepartitionByExpression(exprs, logicalPlan, numPartitions = n)

def analyze: LogicalPlan =
EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan))
analysis.SimpleAnalyzer.execute(logicalPlan).canonicalized
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
def batches: Seq[Batch] = {
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
// However, because we always do eager analysis in Dataset, we do not eliminate subqueries
// or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
EliminateView,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class ColumnPruningSuite extends PlanTest {
x.select('a)
.sortBy(SortOrder('a, Ascending)).analyze

comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
comparePlans(optimized, correctAnswer)

// push down invalid
val originalQuery1 = {
Expand All @@ -261,7 +261,7 @@ class ColumnPruningSuite extends PlanTest {
.sortBy(SortOrder('a, Ascending))
.select('b).analyze

comparePlans(optimized1, analysis.EliminateSubqueryAliases(correctAnswer1))
comparePlans(optimized1, correctAnswer1)
}

test("Column pruning on Window with useless aggregate functions") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.PlanTest
Expand All @@ -34,12 +33,6 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper {
val batches = Batch("EliminateSubqueryAliases", Once, EliminateSubqueryAliases) :: Nil
}

private def assertEquivalent(e1: Expression, e2: Expression): Unit = {
val correctAnswer = Project(Alias(e2, "out")() :: Nil, OneRowRelation).analyze
val actual = Optimize.execute(Project(Alias(e1, "out")() :: Nil, OneRowRelation).analyze)
comparePlans(actual, correctAnswer)
}

private def afterOptimization(plan: LogicalPlan): LogicalPlan = {
Optimize.execute(analysis.SimpleAnalyzer.execute(plan))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ class FilterPushdownSuite extends PlanTest {
}
val optimized = Optimize.execute(originalQuery.analyze)

comparePlans(analysis.EliminateSubqueryAliases(originalQuery.analyze), optimized)
comparePlans(originalQuery.analyze, optimized)
}

test("joins: conjunctive predicates") {
Expand All @@ -468,7 +468,7 @@ class FilterPushdownSuite extends PlanTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze

comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
comparePlans(optimized, correctAnswer)
}

test("joins: conjunctive predicates #2") {
Expand All @@ -487,7 +487,7 @@ class FilterPushdownSuite extends PlanTest {
left.join(right, condition = Some("x.b".attr === "y.b".attr))
.analyze

comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
comparePlans(optimized, correctAnswer)
}

test("joins: conjunctive predicates #3") {
Expand All @@ -511,7 +511,7 @@ class FilterPushdownSuite extends PlanTest {
condition = Some("z.a".attr === "x.b".attr))
.analyze

comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
comparePlans(optimized, correctAnswer)
}

test("joins: push down where clause into left anti join") {
Expand All @@ -526,7 +526,7 @@ class FilterPushdownSuite extends PlanTest {
x.where("x.a".attr > 10)
.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
.analyze
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
comparePlans(optimized, correctAnswer)
}

test("joins: only push down join conditions to the right of a left anti join") {
Expand All @@ -543,7 +543,7 @@ class FilterPushdownSuite extends PlanTest {
LeftAnti,
Some("x.b".attr === "y.b".attr && "x.a".attr > 10))
.analyze
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
comparePlans(optimized, correctAnswer)
}

test("joins: only push down join conditions to the right of an existence join") {
Expand All @@ -561,7 +561,7 @@ class FilterPushdownSuite extends PlanTest {
ExistenceJoin(fillerVal),
Some("x.a".attr > 1))
.analyze
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
comparePlans(optimized, correctAnswer)
}

val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class JoinOptimizationSuite extends PlanTest {

queryAnswers foreach { queryAnswerPair =>
val optimized = Optimize.execute(queryAnswerPair._1.analyze)
comparePlans(optimized, analysis.EliminateSubqueryAliases(queryAnswerPair._2.analyze))
comparePlans(optimized, queryAnswerPair._2.analyze)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.command.DDLUtils
Expand Down Expand Up @@ -353,8 +353,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
relation.catalogTable.identifier
}

val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
EliminateSubqueryAliases(tableRelation) match {
val tableRelation =
df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed.canonicalized
tableRelation match {
// check if the table is a data source table (the relation is a BaseRelation).
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand All @@ -40,8 +39,7 @@ case class AnalyzeColumnCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val relation =
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
val relation = sparkSession.table(tableIdentWithDB).queryExecution.analyzed.canonicalized

// Compute total size
val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SessionState
Expand All @@ -41,8 +40,7 @@ case class AnalyzeTableCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val relation =
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
val relation = sparkSession.table(tableIdentWithDB).queryExecution.analyzed.canonicalized

relation match {
case relation: CatalogRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.TestUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
Expand Down Expand Up @@ -518,7 +518,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
withSQLConf(
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
HiveUtils.CONVERT_METASTORE_ORC.key -> "false") {
relation = EliminateSubqueryAliases(spark.table(tableName).queryExecution.analyzed)
relation = spark.table(tableName).queryExecution.analyzed.canonicalized
}
val catalogTable =
sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
Expand Down