Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,9 @@ class SessionCatalog(
new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString
}

// -------------------------------------------------------------
// | Methods that interact with temporary and metastore tables |
// -------------------------------------------------------------
// ----------------------------------------------
// | Methods that interact with temp views only |
// ----------------------------------------------

/**
* Create a temporary table.
Expand All @@ -343,6 +343,24 @@ class SessionCatalog(
tempTables.put(table, tableDefinition)
}

/**
* Return a temporary view exactly as it was stored.
*/
def getTempView(name: String): Option[LogicalPlan] = synchronized {
Copy link
Contributor

@clockfly clockfly Sep 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the same file, there are also interfaces like isTemporaryTable, tableExists, refreshTable, clearTempTables, which can accept a temporary view, but using the name "table" instead of "view"

I am not sure introducing getTempView and dropTempView will make the class SessionCatalog's interface simpler or more complicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have createTempView, I don't think the new APIs make SessionCatalog more complicated.

tempTables.get(formatTableName(name))
}

/**
* Drop a temporary view.
*/
def dropTempView(name: String): Unit = synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the description for this function too?

tempTables.remove(formatTableName(name))
}

// -------------------------------------------------------------
// | Methods that interact with temporary and metastore tables |
// -------------------------------------------------------------

/**
* Rename a table.
*
Expand Down Expand Up @@ -492,14 +510,6 @@ class SessionCatalog(
tempTables.clear()
}

/**
* Return a temporary table exactly as it was stored.
* For testing only.
*/
private[catalog] def getTempTable(name: String): Option[LogicalPlan] = synchronized {
tempTables.get(formatTableName(name))
}

// ----------------------------------------------------------------------------
// Partitions
// ----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,16 @@ class SessionCatalogSuite extends SparkFunSuite {
val tempTable2 = Range(1, 20, 2, 10)
catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
catalog.createTempView("tbl2", tempTable2, overrideIfExists = false)
assert(catalog.getTempTable("tbl1") == Option(tempTable1))
assert(catalog.getTempTable("tbl2") == Option(tempTable2))
assert(catalog.getTempTable("tbl3").isEmpty)
assert(catalog.getTempView("tbl1") == Option(tempTable1))
assert(catalog.getTempView("tbl2") == Option(tempTable2))
assert(catalog.getTempView("tbl3").isEmpty)
// Temporary table already exists
intercept[TempTableAlreadyExistsException] {
catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
}
// Temporary table already exists but we override it
catalog.createTempView("tbl1", tempTable2, overrideIfExists = true)
assert(catalog.getTempTable("tbl1") == Option(tempTable2))
assert(catalog.getTempView("tbl1") == Option(tempTable2))
}

test("drop table") {
Expand Down Expand Up @@ -251,11 +251,11 @@ class SessionCatalogSuite extends SparkFunSuite {
val tempTable = Range(1, 10, 2, 10)
sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
assert(sessionCatalog.getTempView("tbl1") == Some(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is not specified, temp table should be dropped first
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(sessionCatalog.getTempTable("tbl1") == None)
assert(sessionCatalog.getTempView("tbl1") == None)
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If temp table does not exist, the table in the current database should be dropped
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
Expand All @@ -265,7 +265,7 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false,
purge = false)
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
assert(sessionCatalog.getTempView("tbl1") == Some(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
}

Expand Down Expand Up @@ -303,17 +303,17 @@ class SessionCatalogSuite extends SparkFunSuite {
val tempTable = Range(1, 10, 2, 10)
sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable))
assert(sessionCatalog.getTempView("tbl1") == Option(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is not specified, temp table should be renamed first
sessionCatalog.renameTable(TableIdentifier("tbl1"), "tbl3")
assert(sessionCatalog.getTempTable("tbl1").isEmpty)
assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable))
assert(sessionCatalog.getTempView("tbl1").isEmpty)
assert(sessionCatalog.getTempView("tbl3") == Option(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is specified, temp tables are never renamed
sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), "tbl4")
assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable))
assert(sessionCatalog.getTempTable("tbl4").isEmpty)
assert(sessionCatalog.getTempView("tbl3") == Option(tempTable))
assert(sessionCatalog.getTempView("tbl4").isEmpty)
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
}

val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
val sessionState = df.sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = tableIdent.copy(database = Some(db))
// Pass a table identifier with database part, so that `tableExists` won't check temp views
// unexpectedly.
val tableExists = sessionState.catalog.tableExists(tableIdentWithDB)

(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
Expand All @@ -387,7 +392,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
bucketSpec = getBucketSpec
)
val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
df.sparkSession.sessionState.executePlan(cmd).toRdd
sessionState.executePlan(cmd).toRdd
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
assert(table.provider.isDefined)

val sessionState = sparkSession.sessionState
if (sessionState.catalog.tableExists(table.identifier)) {
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = table.identifier.copy(database = Some(db))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make these three lines a util in TableIdentifier?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateDataSourceTableCommand is planned in physical plan stage. Maybe we should generate correct table identifier in logical plan stage?

// Pass a table identifier with database part, so that `tableExists` won't check temp views
// unexpectedly.
if (sessionState.catalog.tableExists(tableIdentWithDB)) {
if (ignoreIfExists) {
return Seq.empty[Row]
} else {
throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.")
}
}

Expand Down Expand Up @@ -128,9 +132,11 @@ case class CreateDataSourceTableAsSelectCommand(
assert(table.provider.isDefined)
assert(table.schema.isEmpty)

val tableName = table.identifier.unquotedString
val provider = table.provider.get
val sessionState = sparkSession.sessionState
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = table.identifier.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString

val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
Expand All @@ -140,7 +146,9 @@ case class CreateDataSourceTableAsSelectCommand(

var createMetastoreTable = false
var existingSchema = Option.empty[StructType]
if (sparkSession.sessionState.catalog.tableExists(table.identifier)) {
// Pass a table identifier with database part, so that `tableExists` won't check temp views
// unexpectedly.
if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For

// Pass a table identifier with database part, so that `tableExists` won't check temp views

This is kind of hack to me. Do you have ways to avoid hacking table identifier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have, in my previous refactor PR: #14962

This PR aims to just fix these bugs surgically, so that it's easy to backport to 2.0

// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
Expand All @@ -165,7 +173,7 @@ case class CreateDataSourceTableAsSelectCommand(
// inserting into (i.e. using the same compression).

EliminateSubqueryAliases(
sessionState.catalog.lookupRelation(table.identifier)) match {
sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
Expand All @@ -188,7 +196,7 @@ case class CreateDataSourceTableAsSelectCommand(
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
case SaveMode.Overwrite =>
sparkSession.sql(s"DROP TABLE IF EXISTS $tableName")
sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
// Need to create the table again.
createMetastoreTable = true
}
Expand Down Expand Up @@ -230,7 +238,7 @@ case class CreateDataSourceTableAsSelectCommand(
}

// Refresh the cache of the table in the catalog.
sessionState.catalog.refreshTable(table.identifier)
sessionState.catalog.refreshTable(tableIdentWithDB)
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -284,8 +284,10 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* @since 2.0.0
*/
override def dropTempView(viewName: String): Unit = {
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName))
sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true, purge = false)
sparkSession.sessionState.catalog.getTempView(viewName).foreach { tempView =>
sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, tempView))
sessionCatalog.dropTempView(viewName)
}
}

/**
Expand Down
11 changes: 11 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2667,4 +2667,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}.limit(1).queryExecution.toRdd.count()
assert(numRecordsRead.value === 10)
}

test("CREATE TABLE USING should not fail if a same-name temp view exists") {
withTable("same_name") {
withTempView("same_name") {
spark.range(10).createTempView("same_name")
sql("CREATE TABLE same_name(i int) USING json")
checkAnswer(spark.table("same_name"), spark.range(10).toDF())
assert(spark.table("default.same_name").collect().isEmpty)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ class CatalogSuite
assert(e.message.contains("Cannot create hive serde table with createExternalTable API"))
}

test("dropTempView should not un-cache and drop metastore table if a same-name table exists") {
withTable("same_name") {
spark.range(10).write.saveAsTable("same_name")
sql("CACHE TABLE same_name")
assert(spark.catalog.isCached("default.same_name"))
spark.catalog.dropTempView("same_name")
assert(spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
assert(spark.catalog.isCached("default.same_name"))
}
}

// TODO: add tests for the rest of them

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.BeforeAndAfter

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -464,4 +465,79 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
checkAnswer(df, spark.createDataset(expectedResult).toDF())
assert(df.schema === expectedSchema)
}

test("saveAsTable with mode Append should not fail if the table not exists " +
"but a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
spark.range(10).createTempView("same_name")
spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name")
assert(
spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
}
}
}

test("saveAsTable with mode Append should not fail if the table already exists " +
"and a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
sql("CREATE TABLE same_name(id LONG) USING parquet")
spark.range(10).createTempView("same_name")
spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name")
checkAnswer(spark.table("same_name"), spark.range(10).toDF())
checkAnswer(spark.table("default.same_name"), spark.range(20).toDF())
}
}
}

test("saveAsTable with mode ErrorIfExists should not fail if the table not exists " +
"but a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
spark.range(10).createTempView("same_name")
spark.range(20).write.mode(SaveMode.ErrorIfExists).saveAsTable("same_name")
assert(
spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
}
}
}

test("saveAsTable with mode Overwrite should not drop the temp view if the table not exists " +
"but a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
spark.range(10).createTempView("same_name")
spark.range(20).write.mode(SaveMode.Overwrite).saveAsTable("same_name")
assert(spark.sessionState.catalog.getTempView("same_name").isDefined)
assert(
spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
}
}
}

test("saveAsTable with mode Overwrite should not fail if the table already exists " +
"and a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
sql("CREATE TABLE same_name(id LONG) USING parquet")
spark.range(10).createTempView("same_name")
spark.range(20).write.mode(SaveMode.Overwrite).saveAsTable("same_name")
checkAnswer(spark.table("same_name"), spark.range(10).toDF())
checkAnswer(spark.table("default.same_name"), spark.range(20).toDF())
}
}
}

test("saveAsTable with mode Ignore should create the table if the table not exists " +
"but a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
spark.range(10).createTempView("same_name")
spark.range(20).write.mode(SaveMode.Ignore).saveAsTable("same_name")
assert(
spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
}
}
}
}
Loading