diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 920a748e97ca..4843051c6eff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -479,48 +479,24 @@ object QueryCompilationErrors { new AnalysisException("ADD COLUMN with v1 tables cannot specify NOT NULL.") } - def replaceColumnsOnlySupportedWithV2TableError(): Throwable = { - new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.") - } - - def alterQualifiedColumnOnlySupportedWithV2TableError(): Throwable = { - new AnalysisException("ALTER COLUMN with qualified column is only supported with v2 tables.") + def operationOnlySupportedWithV2TableError(operation: String): Throwable = { + new AnalysisException(s"$operation is only supported with v2 tables.") } def alterColumnWithV1TableCannotSpecifyNotNullError(): Throwable = { new AnalysisException("ALTER COLUMN with v1 tables cannot specify NOT NULL.") } - def alterOnlySupportedWithV2TableError(): Throwable = { - new AnalysisException("ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.") - } - def alterColumnCannotFindColumnInV1TableError(colName: String, v1Table: V1Table): Throwable = { new AnalysisException( s"ALTER COLUMN cannot find column $colName in v1 table. " + s"Available: ${v1Table.schema.fieldNames.mkString(", ")}") } - def renameColumnOnlySupportedWithV2TableError(): Throwable = { - new AnalysisException("RENAME COLUMN is only supported with v2 tables.") - } - - def dropColumnOnlySupportedWithV2TableError(): Throwable = { - new AnalysisException("DROP COLUMN is only supported with v2 tables.") - } - def invalidDatabaseNameError(quoted: String): Throwable = { new AnalysisException(s"The database name is not valid: $quoted") } - def replaceTableOnlySupportedWithV2TableError(): Throwable = { - new AnalysisException("REPLACE TABLE is only supported with v2 tables.") - } - - def replaceTableAsSelectOnlySupportedWithV2TableError(): Throwable = { - new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.") - } - def cannotDropViewWithDropTableError(): Throwable = { new AnalysisException("Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index d5f99bfbf70a..15798e0b8cf1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -56,17 +56,19 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField)) case ReplaceColumns(ResolvedV1TableIdentifier(_), _) => - throw QueryCompilationErrors.replaceColumnsOnlySupportedWithV2TableError + throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("REPLACE COLUMNS") case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) => if (a.column.name.length > 1) { - throw QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError + throw QueryCompilationErrors + .operationOnlySupportedWithV2TableError("ALTER COLUMN with qualified column") } if (a.nullable.isDefined) { throw QueryCompilationErrors.alterColumnWithV1TableCannotSpecifyNotNullError } if (a.position.isDefined) { - throw QueryCompilationErrors.alterOnlySupportedWithV2TableError + throw QueryCompilationErrors + .operationOnlySupportedWithV2TableError("ALTER COLUMN ... FIRST | ALTER") } val builder = new MetadataBuilder // Add comment to metadata @@ -88,10 +90,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AlterTableChangeColumnCommand(ident.asTableIdentifier, colName, newColumn) case RenameColumn(ResolvedV1TableIdentifier(_), _, _) => - throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError + throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("RENAME COLUMN") case DropColumns(ResolvedV1TableIdentifier(_), _) => - throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError + throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("DROP COLUMN") case SetTableProperties(ResolvedV1TableIdentifier(ident), props) => AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = false) @@ -145,35 +147,24 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // session catalog and the table provider is not v2. case c @ CreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.tableSpec.provider, - c.tableSpec.options, - c.tableSpec.location, - c.tableSpec.serde, + c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde, ctas = false) if (isSessionCatalog(catalog) && !isV2Provider(provider)) { - val tableDesc = buildCatalogTable(name.asTableIdentifier, c.tableSchema, - c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, provider, - c.tableSpec.location, c.tableSpec.comment, storageFormat, - c.tableSpec.external) - val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists - CreateTableV1(tableDesc, mode, None) + constructV1TableCmd(None, c.tableSpec, name, c.tableSchema, c.partitioning, + c.ignoreIfExists, storageFormat, provider) } else { val newTableSpec = c.tableSpec.copy(bucketSpec = None) c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform), tableSpec = newTableSpec) } - case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, _, _) - if isSessionCatalog(catalog) => + case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde, ctas = true) - if (!isV2Provider(provider)) { - val tableDesc = buildCatalogTable(name.asTableIdentifier, new StructType, - c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, provider, - c.tableSpec.location, c.tableSpec.comment, storageFormat, c.tableSpec.external) - val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists - CreateTableV1(tableDesc, mode, Some(c.query)) + if (isSessionCatalog(catalog) && !isV2Provider(provider)) { + constructV1TableCmd(Some(c.query), c.tableSpec, name, new StructType, c.partitioning, + c.ignoreIfExists, storageFormat, provider) } else { val newTableSpec = c.tableSpec.copy(bucketSpec = None) c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform), @@ -189,21 +180,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the // session catalog and the table provider is not v2. case c @ ReplaceTable( - ResolvedDBObjectName(catalog, name), _, _, _, _) => + ResolvedDBObjectName(catalog, _), _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) if (isSessionCatalog(catalog) && !isV2Provider(provider)) { - throw QueryCompilationErrors.replaceTableOnlySupportedWithV2TableError + throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("REPLACE TABLE") } else { val newTableSpec = c.tableSpec.copy(bucketSpec = None) c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform), tableSpec = newTableSpec) } - case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, _, _) - if isSessionCatalog(catalog) => + case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) - if (!isV2Provider(provider)) { - throw QueryCompilationErrors.replaceTableAsSelectOnlySupportedWithV2TableError + if (isSessionCatalog(catalog) && !isV2Provider(provider)) { + throw QueryCompilationErrors + .operationOnlySupportedWithV2TableError("REPLACE TABLE AS SELECT") } else { val newTableSpec = c.tableSpec.copy(bucketSpec = None) c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform), @@ -294,7 +285,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) ident.asTableIdentifier, Seq(partitionSpec).asUnresolvedPartitionSpecs.map(_.spec).headOption) - case s @ ShowPartitions( + case ShowPartitions( ResolvedV1TableOrViewIdentifier(ident), pattern @ (None | Some(UnresolvedPartitionSpec(_, _))), output) => ShowPartitionsCommand( @@ -302,7 +293,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) output, pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec)) - case s @ ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) => + case ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) => val v1TableName = ident.asTableIdentifier val resolver = conf.resolver val db = ns match { @@ -385,10 +376,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } // If target is view, force use v1 command - case s @ ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey, output) => + case ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey, output) => ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output) - case s @ ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey, output) + case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey, output) if conf.useV1Command => ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output) @@ -435,6 +426,22 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName) } + private def constructV1TableCmd( + query: Option[LogicalPlan], + tableSpec: TableSpec, + name: Seq[String], + tableSchema: StructType, + partitioning: Seq[Transform], + ignoreIfExists: Boolean, + storageFormat: CatalogStorageFormat, + provider: String): CreateTableV1 = { + val tableDesc = buildCatalogTable(name.asTableIdentifier, tableSchema, + partitioning, tableSpec.bucketSpec, tableSpec.properties, provider, + tableSpec.location, tableSpec.comment, storageFormat, tableSpec.external) + val mode = if (ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTableV1(tableDesc, mode, query) + } + private def getStorageFormatAndProvider( provider: Option[String], options: Map[String, String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 2d3c89874f59..f72d03ecc62b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -289,7 +289,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * Note, currently the new table creation by this API doesn't fully cover the V2 table. * TODO (SPARK-33638): Full support of v2 table creation */ - val tableProperties = TableSpec( + val tableSpec = TableSpec( None, Map.empty[String, String], Some(source), @@ -304,7 +304,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { isNamespace = false), df.schema.asNullable, partitioningColumns.getOrElse(Nil).asTransforms.toSeq, - tableProperties, + tableSpec, ignoreIfExists = false) Dataset.ofRows(df.sparkSession, cmd) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 44d79541d6c6..a6e1c788c62f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -407,6 +407,44 @@ class DataSourceV2SQLSuite } } + test("SPARK-36850: CreateTableAsSelect partitions can be specified using " + + "PARTITIONED BY and/or CLUSTERED BY") { + val identifier = "testcat.table_name" + withTable(identifier) { + spark.sql(s"CREATE TABLE $identifier USING foo PARTITIONED BY (id) " + + s"CLUSTERED BY (data) INTO 4 BUCKETS AS SELECT * FROM source") + val describe = spark.sql(s"DESCRIBE $identifier") + val part1 = describe + .filter("col_name = 'Part 0'") + .select("data_type").head.getString(0) + assert(part1 === "id") + val part2 = describe + .filter("col_name = 'Part 1'") + .select("data_type").head.getString(0) + assert(part2 === "bucket(4, data)") + } + } + + test("SPARK-36850: ReplaceTableAsSelect partitions can be specified using " + + "PARTITIONED BY and/or CLUSTERED BY") { + val identifier = "testcat.table_name" + withTable(identifier) { + spark.sql(s"CREATE TABLE $identifier USING foo " + + "AS SELECT id FROM source") + spark.sql(s"REPLACE TABLE $identifier USING foo PARTITIONED BY (id) " + + s"CLUSTERED BY (data) INTO 4 BUCKETS AS SELECT * FROM source") + val describe = spark.sql(s"DESCRIBE $identifier") + val part1 = describe + .filter("col_name = 'Part 0'") + .select("data_type").head.getString(0) + assert(part1 === "id") + val part2 = describe + .filter("col_name = 'Part 1'") + .select("data_type").head.getString(0) + assert(part2 === "bucket(4, data)") + } + } + test("SPARK-37545: CreateTableAsSelect should store location as qualified") { val basicIdentifier = "testcat.table_name" val atomicIdentifier = "testcat_atomic.table_name"