Skip to content

Commit d109a1b

Browse files
clockflyAndrew Or
authored andcommitted
[SPARK-15711][SQL] Ban CREATE TEMPORARY TABLE USING AS SELECT
## What changes were proposed in this pull request? This PR bans syntax like `CREATE TEMPORARY TABLE USING AS SELECT` `CREATE TEMPORARY TABLE ... USING ... AS ...` is not properly implemented, the temporary data is not cleaned up when the session exits. Before a full fix, we probably should ban this syntax. This PR only impact syntax like `CREATE TEMPORARY TABLE ... USING ... AS ...`. Other syntax like `CREATE TEMPORARY TABLE .. USING ...` and `CREATE TABLE ... USING ...` are not impacted. ## How was this patch tested? Unit test. Author: Sean Zhong <seanzhong@databricks.com> Closes #13451 from clockfly/ban_create_temp_table_using_as.
1 parent 9aff6f3 commit d109a1b

File tree

6 files changed

+142
-221
lines changed

6 files changed

+142
-221
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,6 @@ final class DataFrameWriter private[sql](df: DataFrame) {
561561
CreateTableUsingAsSelect(
562562
tableIdent,
563563
source,
564-
temporary = false,
565564
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
566565
getBucketSpec,
567566
mode,

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,17 +317,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
317317
// Get the backing query.
318318
val query = plan(ctx.query)
319319

320+
if (temp) {
321+
throw operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
322+
}
323+
320324
// Determine the storage mode.
321325
val mode = if (ifNotExists) {
322326
SaveMode.Ignore
323-
} else if (temp) {
324-
SaveMode.Overwrite
325327
} else {
326328
SaveMode.ErrorIfExists
327329
}
328330

329331
CreateTableUsingAsSelect(
330-
table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
332+
table, provider, partitionColumnNames, bucketSpec, mode, options, query)
331333
} else {
332334
val struct = Option(ctx.colTypeList()).map(createStructType)
333335
CreateTableUsing(
@@ -960,7 +962,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
960962
CreateTableUsingAsSelect(
961963
tableIdent = tableDesc.identifier,
962964
provider = conf.defaultDataSourceName,
963-
temporary = false,
964965
partitionColumns = tableDesc.partitionColumnNames.toArray,
965966
bucketSpec = None,
966967
mode = mode,

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -397,15 +397,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
397397
throw new AnalysisException(
398398
"allowExisting should be set to false when creating a temporary table.")
399399

400-
case c: CreateTableUsingAsSelect if c.temporary && c.partitionColumns.nonEmpty =>
401-
sys.error("Cannot create temporary partitioned table.")
402-
403-
case c: CreateTableUsingAsSelect if c.temporary =>
404-
val cmd = CreateTempTableUsingAsSelectCommand(
405-
c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child)
406-
ExecutedCommandExec(cmd) :: Nil
407-
408-
case c: CreateTableUsingAsSelect if !c.temporary =>
400+
case c: CreateTableUsingAsSelect =>
409401
val cmd =
410402
CreateDataSourceTableAsSelectCommand(
411403
c.tableIdent,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ case class CreateTableUsing(
5656
case class CreateTableUsingAsSelect(
5757
tableIdent: TableIdentifier,
5858
provider: String,
59-
temporary: Boolean,
6059
partitionColumns: Array[String],
6160
bucketSpec: Option[BucketSpec],
6261
mode: SaveMode,
@@ -91,37 +90,6 @@ case class CreateTempTableUsing(
9190
}
9291
}
9392

94-
case class CreateTempTableUsingAsSelectCommand(
95-
tableIdent: TableIdentifier,
96-
provider: String,
97-
partitionColumns: Array[String],
98-
mode: SaveMode,
99-
options: Map[String, String],
100-
query: LogicalPlan) extends RunnableCommand {
101-
102-
if (tableIdent.database.isDefined) {
103-
throw new AnalysisException(
104-
s"Temporary table '$tableIdent' should not have specified a database")
105-
}
106-
107-
override def run(sparkSession: SparkSession): Seq[Row] = {
108-
val df = Dataset.ofRows(sparkSession, query)
109-
val dataSource = DataSource(
110-
sparkSession,
111-
className = provider,
112-
partitionColumns = partitionColumns,
113-
bucketSpec = None,
114-
options = options)
115-
val result = dataSource.write(mode, df)
116-
sparkSession.sessionState.catalog.createTempView(
117-
tableIdent.table,
118-
Dataset.ofRows(sparkSession, LogicalRelation(result)).logicalPlan,
119-
overrideIfExists = true)
120-
121-
Seq.empty[Row]
122-
}
123-
}
124-
12593
case class RefreshTable(tableIdent: TableIdentifier)
12694
extends RunnableCommand {
12795

0 commit comments

Comments
 (0)