diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index b12606e17d38..8ffa3891b411 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -137,14 +137,16 @@ case class CatalogTable( unsupportedFeatures: Seq[String] = Seq.empty) { // Verify that the provided columns are part of the schema - private val colNames = schema.map(_.name).toSet - private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = { - require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " + - s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'") - } - requireSubsetOfSchema(partitionColumnNames, "partition") - requireSubsetOfSchema(sortColumnNames, "sort") - requireSubsetOfSchema(bucketColumnNames, "bucket") + // TODO: this restriction should be checked at the end of Analyzer. When building CatalogTable, + // the initial version might violate it. + // private val colNames = schema.map(_.name).toSet + // private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = { + // require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " + + // s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'") + // } + // requireSubsetOfSchema(partitionColumnNames, "partition") + // requireSubsetOfSchema(sortColumnNames, "sort") + // requireSubsetOfSchema(bucketColumnNames, "bucket") /** Columns this table is partitioned by. */ def partitionColumns: Seq[CatalogColumn] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 12b304623d30..281f39169fb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -23,8 +23,9 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} -import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable +import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableAsSelect, DataSource, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils /** @@ -366,14 +367,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => + val bucketSpec = getBucketSpec + val sortColumnNames = bucketSpec.map(_.sortColumnNames).getOrElse(Seq.empty) + val bucketColumnNames = bucketSpec.map(_.bucketColumnNames).getOrElse(Seq.empty) + val numBuckets = bucketSpec.map(_.numBuckets).getOrElse(-1) + + val tableDesc = CatalogTable( + identifier = tableIdent, + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = Seq.empty[CatalogColumn], + partitionColumnNames = partitioningColumns.getOrElse(Seq.empty[String]), + sortColumnNames = sortColumnNames, + bucketColumnNames = bucketColumnNames, + numBuckets = numBuckets, + properties = extraOptions.toMap) + val cmd = - CreateTableUsingAsSelect( - tableIdent, + CreateTableAsSelect( + tableDesc, source, - partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), - getBucketSpec, mode, - extraOptions.toMap, df.logicalPlan) df.sparkSession.sessionState.executePlan(cmd).toRdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index ed4ccdb4c8d4..902ecbaa18c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand} -import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{CreateTableAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery} @@ -175,7 +175,7 @@ class Dataset[T] private[sql]( def hasSideEffects(plan: LogicalPlan): Boolean = plan match { case _: Command | _: InsertIntoTable | - _: CreateTableUsingAsSelect => true + _: CreateTableAsSelect => true case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index f77801fd86c1..5376471d99a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} import org.apache.spark.sql.types.DataType @@ -310,7 +310,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a [[CreateTableUsing]] or a [[CreateTableUsingAsSelect]] logical plan. + * Create a [[CreateTableUsing]] or a [[CreateTableAsSelect]] logical plan. */ override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) @@ -340,8 +340,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { SaveMode.ErrorIfExists } - CreateTableUsingAsSelect( - table, provider, partitionColumnNames, bucketSpec, mode, options, query) + val sortColumnNames = bucketSpec.map(_.sortColumnNames).getOrElse(Seq.empty) + val bucketColumnNames = bucketSpec.map(_.bucketColumnNames).getOrElse(Seq.empty) + val numBuckets = bucketSpec.map(_.numBuckets).getOrElse(-1) + + val tableDesc = CatalogTable( + identifier = table, + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = Seq.empty[CatalogColumn], + partitionColumnNames = partitionColumnNames, + sortColumnNames = sortColumnNames, + bucketColumnNames = bucketColumnNames, + numBuckets = numBuckets, + properties = options) + + CreateTableAsSelect( + tableDesc = tableDesc, provider = provider, mode = mode, child = query) } else { val struct = Option(ctx.colTypeList()).map(createStructType) CreateTableUsing( @@ -890,8 +905,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a table, returning either a [[CreateTableCommand]] or a - * [[CreateHiveTableAsSelectLogicalPlan]]. + * Create a table, returning either a [[CreateTableCommand]] or a [[CreateTableAsSelect]]. * * This is not used to create datasource tables, which is handled through * "CREATE TABLE ... USING ...". @@ -1004,6 +1018,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { properties = properties, comment = comment) + val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + selectQuery match { case Some(q) => // Just use whatever is projected in the select statement as our schema @@ -1023,27 +1039,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null) - if (conf.convertCTAS && !hasStorageProperties) { - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties - // are empty Maps. - val optionsWithPath = if (location.isDefined) { - Map("path" -> location.get) - } else { - Map.empty[String, String] - } - CreateTableUsingAsSelect( - tableIdent = tableDesc.identifier, - provider = conf.defaultDataSourceName, - partitionColumns = tableDesc.partitionColumnNames.toArray, - bucketSpec = None, - mode = mode, - options = optionsWithPath, - q - ) - } else { - CreateHiveTableAsSelectLogicalPlan(tableDesc, q, ifNotExists) - } + + val provider = + if (conf.convertCTAS && !hasStorageProperties) conf.defaultDataSourceName else "hive" + + CreateTableAsSelect( + tableDesc = tableDesc, + provider = provider, + mode = mode, + child = q) + case None => CreateTableCommand(tableDesc, ifNotExists) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 5e643ea75a16..586734944847 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -444,15 +444,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { throw new AnalysisException( "allowExisting should be set to false when creating a temporary table.") - case c: CreateTableUsingAsSelect => + case c: CreateTableAsSelect if c.provider != "hive" => val cmd = CreateDataSourceTableAsSelectCommand( - c.tableIdent, + c.tableDesc, c.provider, - c.partitionColumns, - c.bucketSpec, c.mode, - c.options, c.child) ExecutedCommandExec(cmd) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index c38eca5156e5..aeab99fca992 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -130,12 +130,9 @@ case class CreateDataSourceTableCommand( * }}} */ case class CreateDataSourceTableAsSelectCommand( - tableIdent: TableIdentifier, + tableDesc: CatalogTable, provider: String, - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], mode: SaveMode, - options: Map[String, String], query: LogicalPlan) extends RunnableCommand { @@ -146,31 +143,41 @@ case class CreateDataSourceTableAsSelectCommand( // the table name and database name we have for this query. MetaStoreUtils.validateName // is the method used by Hive to check if a table name or a database name is valid for // the metastore. - if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) { - throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " + - s"metastore. Metastore only accepts table name containing characters, numbers and _.") + if (!CreateDataSourceTableUtils.validateName(tableDesc.identifier.table)) { + throw new AnalysisException(s"Table name ${tableDesc.identifier.table} is not a valid name " + + s"for metastore. Metastore only accepts table name containing characters, numbers and _.") } - if (tableIdent.database.isDefined && - !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) { - throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " + - s"for metastore. Metastore only accepts database name containing " + + if (tableDesc.identifier.database.isDefined && + !CreateDataSourceTableUtils.validateName(tableDesc.identifier.database.get)) { + throw new AnalysisException(s"Database name ${tableDesc.identifier.database.get} is not " + + s"a valid name for metastore. Metastore only accepts database name containing " + s"characters, numbers and _.") } - val tableName = tableIdent.unquotedString + val tableName = tableDesc.identifier.unquotedString val sessionState = sparkSession.sessionState var createMetastoreTable = false var isExternal = true val optionsWithPath = - if (!new CaseInsensitiveMap(options).contains("path")) { + if (tableDesc.storage.locationUri.nonEmpty) { + tableDesc.properties + ("path" -> tableDesc.storage.locationUri.get) + } else if (!new CaseInsensitiveMap(tableDesc.properties).contains("path")) { isExternal = false - options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent)) + tableDesc.properties + + ("path" -> sessionState.catalog.defaultTablePath(tableDesc.identifier)) } else { - options + tableDesc.properties } + val bucketSpec: Option[BucketSpec] = if (tableDesc.numBuckets > 0) { + Option(BucketSpec( + tableDesc.numBuckets, tableDesc.bucketColumnNames, tableDesc.sortColumnNames)) + } else { + None + } + var existingSchema = Option.empty[StructType] - if (sparkSession.sessionState.catalog.tableExists(tableIdent)) { + if (sparkSession.sessionState.catalog.tableExists(tableDesc.identifier)) { // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => @@ -187,7 +194,7 @@ case class CreateDataSourceTableAsSelectCommand( val dataSource = DataSource( sparkSession = sparkSession, userSpecifiedSchema = Some(query.schema.asNullable), - partitionColumns = partitionColumns, + partitionColumns = tableDesc.partitionColumnNames, bucketSpec = bucketSpec, className = provider, options = optionsWithPath) @@ -195,13 +202,13 @@ case class CreateDataSourceTableAsSelectCommand( // inserting into (i.e. using the same compression). EliminateSubqueryAliases( - sessionState.catalog.lookupRelation(tableIdent)) match { + sessionState.catalog.lookupRelation(tableDesc.identifier)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => // check if the file formats match l.relation match { case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass => throw new AnalysisException( - s"The file format of the existing table $tableIdent is " + + s"The file format of the existing table ${tableDesc.identifier} is " + s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " + s"format `$provider`") case _ => @@ -238,7 +245,7 @@ case class CreateDataSourceTableAsSelectCommand( val dataSource = DataSource( sparkSession, className = provider, - partitionColumns = partitionColumns, + partitionColumns = tableDesc.partitionColumnNames, bucketSpec = bucketSpec, options = optionsWithPath) @@ -246,7 +253,7 @@ case class CreateDataSourceTableAsSelectCommand( dataSource.write(mode, df) } catch { case ex: AnalysisException => - logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex) + logError(s"Failed to write to table ${tableDesc.identifier} in $mode mode", ex) throw ex } if (createMetastoreTable) { @@ -255,9 +262,9 @@ case class CreateDataSourceTableAsSelectCommand( // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). CreateDataSourceTableUtils.createDataSourceTable( sparkSession = sparkSession, - tableIdent = tableIdent, + tableIdent = tableDesc.identifier, userSpecifiedSchema = Some(result.schema), - partitionColumns = partitionColumns, + partitionColumns = tableDesc.partitionColumnNames.toArray, bucketSpec = bucketSpec, provider = provider, options = optionsWithPath, @@ -265,7 +272,7 @@ case class CreateDataSourceTableAsSelectCommand( } // Refresh the cache of the table in the catalog. - sessionState.catalog.refreshTable(tableIdent) + sessionState.catalog.refreshTable(tableDesc.identifier) Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 5c815df0deb9..d01762b16f34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -33,28 +33,11 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, Catal import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -case class CreateHiveTableAsSelectLogicalPlan( - tableDesc: CatalogTable, - child: LogicalPlan, - allowExisting: Boolean) extends UnaryNode with Command { - - override def output: Seq[Attribute] = Seq.empty[Attribute] - - override lazy val resolved: Boolean = - tableDesc.identifier.database.isDefined && - tableDesc.schema.nonEmpty && - tableDesc.storage.serde.isDefined && - tableDesc.storage.inputFormat.isDefined && - tableDesc.storage.outputFormat.isDefined && - childrenResolved -} - /** * A command to create a table with the same definition of the given existing table. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 31a2075d2ff9..105530865fba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -53,13 +54,10 @@ case class CreateTableUsing( * analyzer can analyze the logical plan that will be used to populate the table. * So, [[PreWriteCheck]] can detect cases that are not allowed. */ -case class CreateTableUsingAsSelect( - tableIdent: TableIdentifier, +case class CreateTableAsSelect( + tableDesc: CatalogTable, provider: String, - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], mode: SaveMode, - options: Map[String, String], child: LogicalPlan) extends logical.UnaryNode { override def output: Seq[Attribute] = Seq.empty[Attribute] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 15b9d14bd73f..034cd478339a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -206,12 +206,12 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") - case c: CreateTableUsingAsSelect => + case c: CreateTableAsSelect if c.provider != "hive" => // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. - if (c.mode == SaveMode.Overwrite && catalog.tableExists(c.tableIdent)) { + if (c.mode == SaveMode.Overwrite && catalog.tableExists(c.tableDesc.identifier)) { // Need to remove SubQuery operator. - EliminateSubqueryAliases(catalog.lookupRelation(c.tableIdent)) match { + EliminateSubqueryAliases(catalog.lookupRelation(c.tableDesc.identifier)) match { // Only do the check if the table is a data source table // (the relation is a BaseRelation). case l @ LogicalRelation(dest: BaseRelation, _, _) => @@ -221,7 +221,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } if (srcRelations.contains(dest)) { failAnalysis( - s"Cannot overwrite table ${c.tableIdent} that is also being read from.") + s"Cannot overwrite table ${c.tableDesc.identifier} that is also being read from.") } else { // OK } @@ -233,11 +233,10 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } PartitioningUtils.validatePartitionColumn( - c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis) + c.child.schema, c.tableDesc.partitionColumnNames, conf.caseSensitiveAnalysis) for { - spec <- c.bucketSpec - sortColumnName <- spec.sortColumnNames + sortColumnName <- c.tableDesc.sortColumnNames sortColumn <- c.child.schema.find(_.name == sortColumnName) } { if (!RowOrdering.isOrderable(sortColumn.dataType)) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2be51ed0e87e..eb9ede29b9ee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.orc.OrcFileFormat @@ -437,24 +436,14 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log object CreateTables extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Wait until children are resolved. - case p: LogicalPlan if !p.childrenResolved => p - case p: LogicalPlan if p.resolved => p - - case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) => - val desc = if (table.storage.serde.isEmpty) { - // add default serde - table.withNewStorage( - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - } else { - table - } - - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) + case p: LogicalPlan if !p.resolved => p + case p @ CreateTableAsSelect(tableDesc, provider, mode, child) if provider == "hive" => + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc.identifier) execution.CreateHiveTableAsSelectCommand( - desc.copy(identifier = TableIdentifier(tblName, Some(dbName))), + tableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))), child, - allowExisting) + mode) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 15a5d79dcb08..bdfb96e76665 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution import scala.util.control.NonFatal -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.RunnableCommand @@ -31,14 +31,16 @@ import org.apache.spark.sql.hive.MetastoreRelation * * @param tableDesc the Table Describe, which may contains serde, storage handler etc. * @param query the query whose result will be insert into the new relation - * @param ignoreIfExists allow continue working if it's already exists, otherwise - * raise exception + * @param mode specifies the behavior when data or table already exists. Options include: + * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). + * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. + * - `SaveMode.Overwrite` and `SaveMode.Append`: N/A */ private[hive] case class CreateHiveTableAsSelectCommand( tableDesc: CatalogTable, query: LogicalPlan, - ignoreIfExists: Boolean) + mode: SaveMode) extends RunnableCommand { private val tableIdentifier = tableDesc.identifier @@ -83,7 +85,9 @@ case class CreateHiveTableAsSelectCommand( // add the relation into catalog, just in case of failure occurs while data // processing. if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) { - if (ignoreIfExists) { + // CTAS in SQL does not expose these two options to users. + assert(mode == SaveMode.Ignore || mode == SaveMode.ErrorIfExists) + if (mode == SaveMode.Ignore) { // table already exists, will do nothing, to keep consistent with Hive } else { throw new AnalysisException(s"$tableIdentifier already exists.") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 867aadb5f556..ed649cfeef3a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.datasources.CreateTableAsSelect import org.apache.spark.sql.hive.test.TestHive class HiveDDLCommandSuite extends PlanTest { @@ -36,7 +37,8 @@ class HiveDDLCommandSuite extends PlanTest { private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { parser.parsePlan(sql).collect { case c: CreateTableCommand => (c.table, c.ifNotExists) - case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting) + case c: CreateTableAsSelect if c.provider == "hive" => + (c.tableDesc, c.mode == SaveMode.Ignore) case c: CreateViewCommand => (c.tableDesc, c.allowExisting) }.head }