Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -368,9 +368,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(s"Table $tableIdent already exists.")

case _ =>
val tableType = if (new CaseInsensitiveMap(extraOptions.toMap).contains("path")) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}

val tableDesc = CatalogTable(
identifier = tableIdent,
tableType = CatalogTableType.EXTERNAL,
tableType = tableType,
storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap),
schema = new StructType,
provider = Some(source),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
.getOrElse(Array.empty[String])
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)

// TODO: this may be wrong for non file-based data source like JDBC, which should be external
// even there is no `path` in options. We should consider allow the EXTERNAL keyword.
val tableType = if (new CaseInsensitiveMap(options).contains("path")) {
CatalogTableType.EXTERNAL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove https://github.com/cloud-fan/spark/blob/0e9b3d2a81f0251f710eae42adca307cd9c69d33/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala#L316-L318 ?

After this PR, it becomes much clear to users. We can support Create External Data Source Table! We just need to confirm the path is provided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing this means we change the syntax, I'd like to avoid it in this PR, and that's why I leave this TODO here, we can do it in follow-up

} else {
CatalogTableType.MANAGED
}

val tableDesc = CatalogTable(
identifier = table,
// TODO: actually the table type may be EXTERNAL if we have `path` in options. However, the
// physical plan `CreateDataSourceTableCommand` doesn't take table type as parameter, but a
// boolean flag called `managedIfNoPath`. We set the table type to MANAGED here to simulate
// setting the `managedIfNoPath` flag. In the future we should refactor the physical plan and
// make it take `CatalogTable` directly.
tableType = CatalogTableType.MANAGED,
tableType = tableType,
storage = CatalogStorageFormat.empty.copy(properties = options),
schema = schema.getOrElse(new StructType),
provider = Some(provider),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,15 +417,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

case CreateTable(tableDesc, mode, None) =>
val cmd =
CreateDataSourceTableCommand(
tableDesc.identifier,
if (tableDesc.schema.nonEmpty) Some(tableDesc.schema) else None,
tableDesc.provider.get,
tableDesc.storage.properties,
tableDesc.partitionColumnNames.toArray,
tableDesc.bucketSpec,
ignoreIfExists = mode == SaveMode.Ignore,
managedIfNoPath = tableDesc.tableType == CatalogTableType.MANAGED)
CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil

// CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
Expand All @@ -434,12 +426,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get != "hive" =>
val cmd =
CreateDataSourceTableAsSelectCommand(
tableDesc.identifier,
tableDesc.provider.get,
tableDesc.partitionColumnNames.toArray,
tableDesc.bucketSpec,
tableDesc,
mode,
tableDesc.storage.properties,
query)
ExecutedCommandExec(cmd) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,79 +40,64 @@ import org.apache.spark.sql.types._
* USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
* }}}
*/
case class CreateDataSourceTableCommand(
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String],
userSpecifiedPartitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
ignoreIfExists: Boolean,
managedIfNoPath: Boolean)
case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableName = tableIdent.unquotedString
val sessionState = sparkSession.sessionState
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)

if (sessionState.catalog.tableExists(tableIdent)) {
val sessionState = sparkSession.sessionState
if (sessionState.catalog.tableExists(table.identifier)) {
if (ignoreIfExists) {
return Seq.empty[Row]
} else {
throw new AnalysisException(s"Table $tableName already exists.")
throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
}
}

var isExternal = true
val optionsWithPath =
if (!new CaseInsensitiveMap(options).contains("path") && managedIfNoPath) {
isExternal = false
options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
} else {
options
}
val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
} else {
table.storage.properties
}

// Create the relation to validate the arguments before writing the metadata to the metastore.
// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val dataSource: BaseRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
bucketSpec = None,
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
className = table.provider.get,
bucketSpec = table.bucketSpec,
options = optionsWithPath).resolveRelation(checkPathExist = false)

val partitionColumns = if (userSpecifiedSchema.nonEmpty) {
userSpecifiedPartitionColumns
val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames
} else {
// This is guaranteed in `PreprocessDDL`.
assert(userSpecifiedPartitionColumns.isEmpty)
assert(table.partitionColumnNames.isEmpty)
dataSource match {
case r: HadoopFsRelation => r.partitionSchema.fieldNames
case _ => Array.empty[String]
case r: HadoopFsRelation => r.partitionSchema.fieldNames.toSeq
case _ => Nil
}
}

val table = CatalogTable(
identifier = tableIdent,
tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
val newTable = table.copy(
storage = table.storage.copy(properties = optionsWithPath),
schema = dataSource.schema,
provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = bucketSpec
)

partitionColumnNames = partitionColumnNames)
// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
sessionState.catalog.createTable(table, ignoreIfExists = false)
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
Seq.empty[Row]
}
}

/**
* A command used to create a data source table using the result of a query.
*
* Note: This is different from [[CreateTableAsSelectLogicalPlan]]. Please check the syntax for
* Note: This is different from `CreateHiveTableAsSelectCommand`. Please check the syntax for
* difference. This is not intended for temporary tables.
*
* The syntax of using this command in SQL is:
Expand All @@ -123,32 +108,31 @@ case class CreateDataSourceTableCommand(
* }}}
*/
case class CreateDataSourceTableAsSelectCommand(
tableIdent: TableIdentifier,
provider: String,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
table: CatalogTable,
mode: SaveMode,
options: Map[String, String],
query: LogicalPlan)
extends RunnableCommand {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableName = tableIdent.unquotedString
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
assert(table.schema.isEmpty)

val tableName = table.identifier.unquotedString
val provider = table.provider.get
val sessionState = sparkSession.sessionState
var createMetastoreTable = false
var isExternal = true
val optionsWithPath =
if (!new CaseInsensitiveMap(options).contains("path")) {
isExternal = false
options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
} else {
options
}

val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
} else {
table.storage.properties
}

var createMetastoreTable = false
var existingSchema = Option.empty[StructType]
if (sparkSession.sessionState.catalog.tableExists(tableIdent)) {
if (sparkSession.sessionState.catalog.tableExists(table.identifier)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
Expand All @@ -165,21 +149,21 @@ case class CreateDataSourceTableAsSelectCommand(
val dataSource = DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = Some(query.schema.asNullable),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = provider,
options = optionsWithPath)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).

EliminateSubqueryAliases(
sessionState.catalog.lookupRelation(tableIdent)) match {
sessionState.catalog.lookupRelation(table.identifier)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass =>
throw new AnalysisException(
s"The file format of the existing table $tableIdent is " +
s"The file format of the existing table $tableName is " +
s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " +
s"format `$provider`")
case _ =>
Expand Down Expand Up @@ -216,36 +200,29 @@ case class CreateDataSourceTableAsSelectCommand(
val dataSource = DataSource(
sparkSession,
className = provider,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
options = optionsWithPath)

val result = try {
dataSource.write(mode, df)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex)
logError(s"Failed to write to table $tableName in $mode mode", ex)
throw ex
}
if (createMetastoreTable) {
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
val schema = result.schema
val table = CatalogTable(
identifier = tableIdent,
tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
schema = schema,
provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = bucketSpec
)
sessionState.catalog.createTable(table, ignoreIfExists = false)
val newTable = table.copy(
storage = table.storage.copy(properties = optionsWithPath),
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
schema = result.schema)
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
}

// Refresh the cache of the table in the catalog.
sessionState.catalog.refreshTable(tableIdent)
sessionState.catalog.refreshTable(table.identifier)
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val e = intercept[AnalysisException] {
createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet")
}
assert(e.getMessage.contains("The file format of the existing table `appendOrcToParquet` " +
assert(e.getMessage.contains("The file format of the existing table appendOrcToParquet " +
"is `org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat`. " +
"It doesn't match the specified format `orc`"))
}
Expand All @@ -917,7 +917,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
createDF(10, 19).write.mode(SaveMode.Append).format("parquet")
.saveAsTable("appendParquetToJson")
}
assert(e.getMessage.contains("The file format of the existing table `appendParquetToJson` " +
assert(e.getMessage.contains("The file format of the existing table appendParquetToJson " +
"is `org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " +
"It doesn't match the specified format `parquet`"))
}
Expand All @@ -928,7 +928,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
createDF(10, 19).write.mode(SaveMode.Append).format("text")
.saveAsTable("appendTextToJson")
}
assert(e.getMessage.contains("The file format of the existing table `appendTextToJson` is " +
assert(e.getMessage.contains("The file format of the existing table appendTextToJson is " +
"`org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " +
"It doesn't match the specified format `text`"))
}
Expand Down