Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ statement
identifierCommentList? (COMMENT STRING)?
(PARTITIONED ON identifierList)?
(TBLPROPERTIES tablePropertyList)? AS query #createView
| CREATE (OR REPLACE)? TEMPORARY VIEW
tableIdentifier ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTempViewUsing
| ALTER VIEW tableIdentifier AS? query #alterViewQuery
| CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING
(USING resource (',' resource)*)? #createFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.DataType

Expand Down Expand Up @@ -343,6 +343,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
}

/**
* Creates a [[CreateTempViewUsing]] logical plan.
*/
override def visitCreateTempViewUsing(
ctx: CreateTempViewUsingContext): LogicalPlan = withOrigin(ctx) {
CreateTempViewUsing(
tableIdent = visitTableIdentifier(ctx.tableIdentifier()),
userSpecifiedSchema = Option(ctx.colTypeList()).map(createStructType),
replace = ctx.REPLACE != null,
provider = ctx.tableProvider.qualifiedName.getText,
options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}

/**
* Create a [[LoadDataCommand]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case c: CreateTableUsing if c.temporary && !c.allowExisting =>
logWarning(
s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " +
s"please use CREATE TEMPORARY VIEW viewName USING... instead")
ExecutedCommandExec(
CreateTempTableUsing(
c.tableIdent, c.userSpecifiedSchema, c.provider, c.options)) :: Nil
CreateTempViewUsing(
c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil

case c: CreateTableUsing if !c.temporary =>
val cmd =
Expand Down Expand Up @@ -416,6 +419,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
c.child)
ExecutedCommandExec(cmd) :: Nil

case c: CreateTempViewUsing =>
ExecutedCommandExec(c) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ case class CreateTableUsingAsSelect(
override def output: Seq[Attribute] = Seq.empty[Attribute]
}

case class CreateTempTableUsing(
case class CreateTempViewUsing(
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
replace: Boolean,
provider: String,
options: Map[String, String]) extends RunnableCommand {

Expand All @@ -85,7 +86,7 @@ case class CreateTempTableUsing(
sparkSession.sessionState.catalog.createTempView(
tableIdent.table,
Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
overrideIfExists = true)
replace)

Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
Expand Down Expand Up @@ -422,6 +422,25 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}

test("create temporary view using") {
val csvFile = Thread.currentThread().getContextClassLoader.getResource("cars.csv").toString()
withView("testview") {
sql(s"CREATE OR REPLACE TEMPORARY VIEW testview (c1: String, c2: String) USING " +
"org.apache.spark.sql.execution.datasources.csv.CSVFileFormat " +
s"OPTIONS (PATH '$csvFile')")

checkAnswer(
sql("select c1, c2 from testview order by c1 limit 1"),
Row("1997", "Ford") :: Nil)

// Fails if creating a new view with the same name
intercept[TempTableAlreadyExistsException] {
sql(s"CREATE TEMPORARY VIEW testview USING " +
s"org.apache.spark.sql.execution.datasources.csv.CSVFileFormat OPTIONS (PATH '$csvFile')")
}
}
}

test("alter table: rename") {
val catalog = spark.sessionState.catalog
val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
Expand Down