Skip to content

Commit aea33bf

Browse files
dongjoon-hyunhvanhovell
authored andcommitted
[SPARK-16458][SQL] SessionCatalog should support listColumns for temporary tables
## What changes were proposed in this pull request? Temporary tables are used frequently, but `spark.catalog.listColumns` does not support those tables. This PR make `SessionCatalog` supports temporary table column listing. **Before** ```scala scala> spark.range(10).createOrReplaceTempView("t1") scala> spark.catalog.listTables().collect() res1: Array[org.apache.spark.sql.catalog.Table] = Array(Table[name=`t1`, tableType=`TEMPORARY`, isTemporary=`true`]) scala> spark.catalog.listColumns("t1").collect() org.apache.spark.sql.AnalysisException: Table `t1` does not exist in database `default`.; ``` **After** ``` scala> spark.catalog.listColumns("t1").collect() res2: Array[org.apache.spark.sql.catalog.Column] = Array(Column[name='id', description='id', dataType='bigint', nullable='false', isPartition='false', isBucket='false']) ``` ## How was this patch tested? Pass the Jenkins tests including a new testcase. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #14114 from dongjoon-hyun/SPARK-16458. (cherry picked from commit 840853e) Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
1 parent 72cf743 commit aea33bf

File tree

5 files changed

+71
-10
lines changed

5 files changed

+71
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy
2222
import scala.collection.mutable
2323

2424
import org.apache.hadoop.conf.Configuration
25-
import org.apache.hadoop.fs.{FileSystem, Path}
25+
import org.apache.hadoop.fs.Path
2626

2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.AnalysisException
@@ -253,9 +253,27 @@ class SessionCatalog(
253253
def getTableMetadata(name: TableIdentifier): CatalogTable = {
254254
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
255255
val table = formatTableName(name.table)
256-
requireDbExists(db)
257-
requireTableExists(TableIdentifier(table, Some(db)))
258-
externalCatalog.getTable(db, table)
256+
val tid = TableIdentifier(table)
257+
if (isTemporaryTable(name)) {
258+
CatalogTable(
259+
identifier = tid,
260+
tableType = CatalogTableType.VIEW,
261+
storage = CatalogStorageFormat.empty,
262+
schema = tempTables(table).output.map { c =>
263+
CatalogColumn(
264+
name = c.name,
265+
dataType = c.dataType.catalogString,
266+
nullable = c.nullable,
267+
comment = Option(c.name)
268+
)
269+
},
270+
properties = Map(),
271+
viewText = None)
272+
} else {
273+
requireDbExists(db)
274+
requireTableExists(TableIdentifier(table, Some(db)))
275+
externalCatalog.getTable(db, table)
276+
}
259277
}
260278

261279
/**
@@ -432,10 +450,10 @@ class SessionCatalog(
432450
def tableExists(name: TableIdentifier): Boolean = synchronized {
433451
val db = formatDatabaseName(name.database.getOrElse(currentDb))
434452
val table = formatTableName(name.table)
435-
if (name.database.isDefined || !tempTables.contains(table)) {
436-
externalCatalog.tableExists(db, table)
453+
if (isTemporaryTable(name)) {
454+
true
437455
} else {
438-
true // it's a temporary table
456+
externalCatalog.tableExists(db, table)
439457
}
440458
}
441459

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,39 @@ class SessionCatalogSuite extends SparkFunSuite {
432432
assert(catalog.tableExists(TableIdentifier("tbl3")))
433433
}
434434

435+
test("tableExists on temporary views") {
436+
val catalog = new SessionCatalog(newBasicCatalog())
437+
val tempTable = Range(1, 10, 2, 10)
438+
assert(!catalog.tableExists(TableIdentifier("view1")))
439+
assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
440+
catalog.createTempView("view1", tempTable, overrideIfExists = false)
441+
assert(catalog.tableExists(TableIdentifier("view1")))
442+
assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
443+
}
444+
445+
test("getTableMetadata on temporary views") {
446+
val catalog = new SessionCatalog(newBasicCatalog())
447+
val tempTable = Range(1, 10, 2, 10)
448+
val m = intercept[AnalysisException] {
449+
catalog.getTableMetadata(TableIdentifier("view1"))
450+
}.getMessage
451+
assert(m.contains("Table or view 'view1' not found in database 'default'"))
452+
453+
val m2 = intercept[AnalysisException] {
454+
catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
455+
}.getMessage
456+
assert(m2.contains("Table or view 'view1' not found in database 'default'"))
457+
458+
catalog.createTempView("view1", tempTable, overrideIfExists = false)
459+
assert(catalog.getTableMetadata(TableIdentifier("view1")).identifier.table == "view1")
460+
assert(catalog.getTableMetadata(TableIdentifier("view1")).schema(0).name == "id")
461+
462+
val m3 = intercept[AnalysisException] {
463+
catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
464+
}.getMessage
465+
assert(m3.contains("Table or view 'view1' not found in database 'default'"))
466+
}
467+
435468
test("list tables without pattern") {
436469
val catalog = new SessionCatalog(newBasicCatalog())
437470
val tempTable = Range(1, 10, 2, 10)

sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ abstract class Catalog {
8585
def listFunctions(dbName: String): Dataset[Function]
8686

8787
/**
88-
* Returns a list of columns for the given table in the current database.
88+
* Returns a list of columns for the given table in the current database or
89+
* the given temporary table.
8990
*
9091
* @since 2.0.0
9192
*/

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
138138
*/
139139
@throws[AnalysisException]("table does not exist")
140140
override def listColumns(tableName: String): Dataset[Column] = {
141-
listColumns(currentDatabase, tableName)
141+
listColumns(TableIdentifier(tableName, None))
142142
}
143143

144144
/**
@@ -147,7 +147,11 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
147147
@throws[AnalysisException]("database or table does not exist")
148148
override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
149149
requireTableExists(dbName, tableName)
150-
val tableMetadata = sessionCatalog.getTableMetadata(TableIdentifier(tableName, Some(dbName)))
150+
listColumns(TableIdentifier(tableName, Some(dbName)))
151+
}
152+
153+
private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
154+
val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier)
151155
val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
152156
val bucketColumnNames = tableMetadata.bucketColumnNames.toSet
153157
val columns = tableMetadata.schema.map { c =>

sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,11 @@ class CatalogSuite
234234
testListColumns("tab1", dbName = None)
235235
}
236236

237+
test("list columns in temporary table") {
238+
createTempTable("temp1")
239+
spark.catalog.listColumns("temp1")
240+
}
241+
237242
test("list columns in database") {
238243
createDatabase("db1")
239244
createTable("tab1", Some("db1"))

0 commit comments

Comments
 (0)