From c7e5083e7900a1968d93df8a340d359db2694f7a Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Mon, 22 Feb 2021 11:09:25 +0800 Subject: [PATCH 1/5] fix view resolution --- .../sql/catalyst/analysis/Analyzer.scala | 7 +++++++ .../sql/execution/SQLViewTestSuite.scala | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 29341aecc1842..79d0ec41bf6e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1115,6 +1115,13 @@ class Analyzer(override val catalogManager: CatalogManager) executeSameContext(child) } } + // Fail the analysis eagerly because outside AnalysisContext, the unresolved operators + // inside a view maybe resolved incorrectly. + newChild.foreachUp { + case o if !o.resolved => + failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}") + case _ => + } view.copy(child = newChild) case p @ SubqueryAlias(_, view: View) => p.copy(child = resolveViews(view)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index 68e1a682562ac..61c1f6fa16a99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -258,6 +258,26 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils { checkViewOutput(viewName, Seq(Row(2))) } } + + test("SPARK-34490 - query should fail if the view refers a dropped table") { + withTable("t") { + Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") + val viewName = createView("testView", "SELECT * FROM t") + withView(viewName) { + // Always create a temp view in this case, not use `createView` on purpose + sql("CREATE TEMP VIEW t AS SELECT 1 AS c1") + withTempView("t") { + checkViewOutput(viewName, Seq(Row(2), Row(3), Row(1))) + // Manually drop table `t` to see if the query will fail + sql("DROP TABLE IF EXISTS default.t") + val e = intercept[AnalysisException] { + sql(s"SELECT * FROM $viewName").collect() + }.getMessage + assert(e.contains("unresolved operator 'UnresolvedRelation [t]")) + } + } + } + } } class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession { From 2642877c983e4f0dc047a85a96569cbd995c7e46 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Mon, 22 Feb 2021 17:00:30 +0800 Subject: [PATCH 2/5] fix test --- .../spark/sql/catalyst/analysis/Analyzer.scala | 7 ++----- .../catalyst/analysis/TableLookupCacheSuite.scala | 13 ++++++++++--- .../spark/sql/execution/SQLViewTestSuite.scala | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 79d0ec41bf6e6..a85979c464835 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1117,11 +1117,7 @@ class Analyzer(override val catalogManager: CatalogManager) } // Fail the analysis eagerly because outside AnalysisContext, the unresolved operators // inside a view maybe resolved incorrectly. - newChild.foreachUp { - case o if !o.resolved => - failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}") - case _ => - } + checkAnalysis(newChild) view.copy(child = newChild) case p @ SubqueryAlias(_, view: View) => p.copy(child = resolveViews(view)) @@ -1197,6 +1193,7 @@ class Analyzer(override val catalogManager: CatalogManager) case u @ UnresolvedTableOrView(identifier, _, _) => lookupTableOrView(identifier).getOrElse(u) + } private def lookupTableOrView(identifier: Seq[String]): Option[LogicalPlan] = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala index 3e9a8b71a8fb6..ec9480514ba2d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import java.io.File +import scala.collection.JavaConverters._ + import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock @@ -27,8 +29,8 @@ import org.scalatest.matchers.must.Matchers import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.connector.InMemoryTableCatalog -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, V1Table} +import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table} import org.apache.spark.sql.types._ class TableLookupCacheSuite extends AnalysisTest with Matchers { @@ -46,7 +48,12 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers { ignoreIfExists = false) val v2Catalog = new InMemoryTableCatalog { override def loadTable(ident: Identifier): Table = { - V1Table(externalCatalog.getTable("default", ident.name)) + val catalogTable = externalCatalog.getTable("default", ident.name) + new InMemoryTable( + catalogTable.identifier.table, + catalogTable.schema, + Array.empty, + Map.empty[String, String].asJava) } override def name: String = CatalogManager.SESSION_CATALOG_NAME } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index 61c1f6fa16a99..84a20bb16ad86 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -273,7 +273,7 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils { val e = intercept[AnalysisException] { sql(s"SELECT * FROM $viewName").collect() }.getMessage - assert(e.contains("unresolved operator 'UnresolvedRelation [t]")) + assert(e.contains("Table or view not found: t")) } } } From df0d8be1fa53c386953d750cc27ffe4706da92b9 Mon Sep 17 00:00:00 2001 From: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com> Date: Mon, 22 Feb 2021 17:49:31 +0800 Subject: [PATCH 3/5] Update Analyzer.scala --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a85979c464835..e9789b6e81cba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1193,7 +1193,6 @@ class Analyzer(override val catalogManager: CatalogManager) case u @ UnresolvedTableOrView(identifier, _, _) => lookupTableOrView(identifier).getOrElse(u) - } private def lookupTableOrView(identifier: Seq[String]): Option[LogicalPlan] = { From 5c50e04f6d56d619b77142fcb5be35c97c5e4b0d Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Mon, 22 Feb 2021 23:15:47 +0800 Subject: [PATCH 4/5] fix test --- .../sql/catalyst/analysis/Analyzer.scala | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e9789b6e81cba..4e6217900a83b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -871,24 +871,24 @@ class Analyzer(override val catalogManager: CatalogManager) object ResolveTempViews extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case u @ UnresolvedRelation(ident, _, isStreaming) => - lookupTempView(ident, isStreaming).getOrElse(u) + lookupTempView(ident, isStreaming, performCheck = true).getOrElse(u) case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _, _) => - lookupTempView(ident) + lookupTempView(ident, performCheck = true) .map(view => i.copy(table = view)) .getOrElse(i) case c @ CacheTable(UnresolvedRelation(ident, _, false), _, _, _) => - lookupTempView(ident) + lookupTempView(ident, performCheck = true) .map(view => c.copy(table = view)) .getOrElse(c) case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) => - lookupTempView(ident) + lookupTempView(ident, performCheck = true) .map(view => c.copy(table = view, isTempView = true)) .getOrElse(c) // TODO (SPARK-27484): handle streaming write commands when we have them. case write: V2WriteCommand => write.table match { case UnresolvedRelation(ident, _, false) => - lookupTempView(ident).map(EliminateSubqueryAliases(_)).map { + lookupTempView(ident, performCheck = true).map(EliminateSubqueryAliases(_)).map { case r: DataSourceV2Relation => write.withNewTable(r) case _ => throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted) }.getOrElse(write) @@ -921,7 +921,9 @@ class Analyzer(override val catalogManager: CatalogManager) } def lookupTempView( - identifier: Seq[String], isStreaming: Boolean = false): Option[LogicalPlan] = { + identifier: Seq[String], + isStreaming: Boolean = false, + performCheck: Boolean = false): Option[LogicalPlan] = { // Permanent View can't refer to temp views, no need to lookup at all. if (isResolvingView && !referredTempViewNames.contains(identifier)) return None @@ -934,7 +936,7 @@ class Analyzer(override val catalogManager: CatalogManager) if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) { throw QueryCompilationErrors.readNonStreamingTempViewError(identifier.quoted) } - tmpView.map(ResolveRelations.resolveViews) + tmpView.map(ResolveRelations.resolveViews(_, performCheck)) } } @@ -1098,7 +1100,7 @@ class Analyzer(override val catalogManager: CatalogManager) // look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name. // If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names // with it, instead of current catalog and namespace. - def resolveViews(plan: LogicalPlan): LogicalPlan = plan match { + def resolveViews(plan: LogicalPlan, performCheck: Boolean = false): LogicalPlan = plan match { // The view's child should be a logical plan parsed from the `desc.viewText`, the variable // `viewText` should be defined, or else we throw an error on the generation of the View // operator. @@ -1117,10 +1119,12 @@ class Analyzer(override val catalogManager: CatalogManager) } // Fail the analysis eagerly because outside AnalysisContext, the unresolved operators // inside a view maybe resolved incorrectly. - checkAnalysis(newChild) + if (performCheck) { + checkAnalysis(newChild) + } view.copy(child = newChild) case p @ SubqueryAlias(_, view: View) => - p.copy(child = resolveViews(view)) + p.copy(child = resolveViews(view, performCheck)) case _ => plan } @@ -1140,14 +1144,14 @@ class Analyzer(override val catalogManager: CatalogManager) case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) => lookupRelation(u.multipartIdentifier, u.options, false) - .map(resolveViews) + .map(resolveViews(_, performCheck = true)) .map(EliminateSubqueryAliases(_)) .map(relation => c.copy(table = relation)) .getOrElse(c) case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) => lookupRelation(u.multipartIdentifier, u.options, false) - .map(resolveViews) + .map(resolveViews(_, performCheck = true)) .map(EliminateSubqueryAliases(_)) .map(relation => c.copy(table = relation)) .getOrElse(c) @@ -1173,7 +1177,7 @@ class Analyzer(override val catalogManager: CatalogManager) case u: UnresolvedRelation => lookupRelation(u.multipartIdentifier, u.options, u.isStreaming) - .map(resolveViews).getOrElse(u) + .map(resolveViews(_, performCheck = true)).getOrElse(u) case u @ UnresolvedTable(identifier, cmd, relationTypeMismatchHint) => lookupTableOrView(identifier).map { From 37c6b9714f9499608807a39a5e646630dff8c330 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Tue, 23 Feb 2021 13:51:45 +0800 Subject: [PATCH 5/5] add more comments --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4e6217900a83b..38259c234c262 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1119,6 +1119,10 @@ class Analyzer(override val catalogManager: CatalogManager) } // Fail the analysis eagerly because outside AnalysisContext, the unresolved operators // inside a view maybe resolved incorrectly. + // But for commands like `DropViewCommand`, resolving view is unnecessary even though + // there is unresolved node. So use the `performCheck` flag to skip the analysis check + // for these commands. + // TODO(SPARK-34504): avoid unnecessary view resolving and remove the `performCheck` flag if (performCheck) { checkAnalysis(newChild) }