Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -479,48 +479,24 @@ object QueryCompilationErrors {
new AnalysisException("ADD COLUMN with v1 tables cannot specify NOT NULL.")
}

def replaceColumnsOnlySupportedWithV2TableError(): Throwable = {
new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.")
}

def alterQualifiedColumnOnlySupportedWithV2TableError(): Throwable = {
new AnalysisException("ALTER COLUMN with qualified column is only supported with v2 tables.")
def operationOnlySupportedWithV2TableError(operation: String): Throwable = {
new AnalysisException(s"$operation is only supported with v2 tables.")
}

def alterColumnWithV1TableCannotSpecifyNotNullError(): Throwable = {
new AnalysisException("ALTER COLUMN with v1 tables cannot specify NOT NULL.")
}

def alterOnlySupportedWithV2TableError(): Throwable = {
new AnalysisException("ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.")
}

def alterColumnCannotFindColumnInV1TableError(colName: String, v1Table: V1Table): Throwable = {
new AnalysisException(
s"ALTER COLUMN cannot find column $colName in v1 table. " +
s"Available: ${v1Table.schema.fieldNames.mkString(", ")}")
}

def renameColumnOnlySupportedWithV2TableError(): Throwable = {
new AnalysisException("RENAME COLUMN is only supported with v2 tables.")
}

def dropColumnOnlySupportedWithV2TableError(): Throwable = {
new AnalysisException("DROP COLUMN is only supported with v2 tables.")
}

def invalidDatabaseNameError(quoted: String): Throwable = {
new AnalysisException(s"The database name is not valid: $quoted")
}

def replaceTableOnlySupportedWithV2TableError(): Throwable = {
new AnalysisException("REPLACE TABLE is only supported with v2 tables.")
}

def replaceTableAsSelectOnlySupportedWithV2TableError(): Throwable = {
new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.")
}

def cannotDropViewWithDropTableError(): Throwable = {
new AnalysisException("Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,19 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField))

case ReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
throw QueryCompilationErrors.replaceColumnsOnlySupportedWithV2TableError
throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("REPLACE COLUMNS")

case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) =>
if (a.column.name.length > 1) {
throw QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError
throw QueryCompilationErrors
.operationOnlySupportedWithV2TableError("ALTER COLUMN with qualified column")
}
if (a.nullable.isDefined) {
throw QueryCompilationErrors.alterColumnWithV1TableCannotSpecifyNotNullError
}
if (a.position.isDefined) {
throw QueryCompilationErrors.alterOnlySupportedWithV2TableError
throw QueryCompilationErrors
.operationOnlySupportedWithV2TableError("ALTER COLUMN ... FIRST | ALTER")
}
val builder = new MetadataBuilder
// Add comment to metadata
Expand All @@ -88,10 +90,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
AlterTableChangeColumnCommand(ident.asTableIdentifier, colName, newColumn)

case RenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError
throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("RENAME COLUMN")

case DropColumns(ResolvedV1TableIdentifier(_), _) =>
throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError
throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("DROP COLUMN")

case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = false)
Expand Down Expand Up @@ -145,35 +147,24 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// session catalog and the table provider is not v2.
case c @ CreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider,
c.tableSpec.options,
c.tableSpec.location,
c.tableSpec.serde,
c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
ctas = false)
if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
val tableDesc = buildCatalogTable(name.asTableIdentifier, c.tableSchema,
c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, provider,
c.tableSpec.location, c.tableSpec.comment, storageFormat,
c.tableSpec.external)
val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableV1(tableDesc, mode, None)
constructV1TableCmd(None, c.tableSpec, name, c.tableSchema, c.partitioning,
c.ignoreIfExists, storageFormat, provider)
} else {
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
tableSpec = newTableSpec)
}

case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, _, _)
if isSessionCatalog(catalog) =>
case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
ctas = true)
if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(name.asTableIdentifier, new StructType,
c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, provider,
c.tableSpec.location, c.tableSpec.comment, storageFormat, c.tableSpec.external)
val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableV1(tableDesc, mode, Some(c.query))
if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
constructV1TableCmd(Some(c.query), c.tableSpec, name, new StructType, c.partitioning,
c.ignoreIfExists, storageFormat, provider)
} else {
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
Expand All @@ -189,21 +180,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
// session catalog and the table provider is not v2.
case c @ ReplaceTable(
ResolvedDBObjectName(catalog, name), _, _, _, _) =>
ResolvedDBObjectName(catalog, _), _, _, _, _) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
throw QueryCompilationErrors.replaceTableOnlySupportedWithV2TableError
throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("REPLACE TABLE")
} else {
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
tableSpec = newTableSpec)
}

case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, _, _)
if isSessionCatalog(catalog) =>
case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, _, _) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So seems we don't have test for this case? Otherwise previously wrongly placed isSessionCatalog(catalog) should be detected?

val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.replaceTableAsSelectOnlySupportedWithV2TableError
if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
throw QueryCompilationErrors
.operationOnlySupportedWithV2TableError("REPLACE TABLE AS SELECT")
} else {
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
Expand Down Expand Up @@ -294,15 +285,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
ident.asTableIdentifier,
Seq(partitionSpec).asUnresolvedPartitionSpecs.map(_.spec).headOption)

case s @ ShowPartitions(
case ShowPartitions(
ResolvedV1TableOrViewIdentifier(ident),
pattern @ (None | Some(UnresolvedPartitionSpec(_, _))), output) =>
ShowPartitionsCommand(
ident.asTableIdentifier,
output,
pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec))

case s @ ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) =>
case ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) =>
val v1TableName = ident.asTableIdentifier
val resolver = conf.resolver
val db = ns match {
Expand Down Expand Up @@ -385,10 +376,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}

// If target is view, force use v1 command
case s @ ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey, output) =>
case ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey, output) =>
ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output)

case s @ ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey, output)
case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey, output)
if conf.useV1Command =>
ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output)

Expand Down Expand Up @@ -435,6 +426,22 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
}

private def constructV1TableCmd(
query: Option[LogicalPlan],
tableSpec: TableSpec,
name: Seq[String],
tableSchema: StructType,
partitioning: Seq[Transform],
ignoreIfExists: Boolean,
storageFormat: CatalogStorageFormat,
provider: String): CreateTableV1 = {
val tableDesc = buildCatalogTable(name.asTableIdentifier, tableSchema,
partitioning, tableSpec.bucketSpec, tableSpec.properties, provider,
tableSpec.location, tableSpec.comment, storageFormat, tableSpec.external)
val mode = if (ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableV1(tableDesc, mode, query)
}

private def getStorageFormatAndProvider(
provider: Option[String],
options: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
* Note, currently the new table creation by this API doesn't fully cover the V2 table.
* TODO (SPARK-33638): Full support of v2 table creation
*/
val tableProperties = TableSpec(
val tableSpec = TableSpec(
None,
Map.empty[String, String],
Some(source),
Expand All @@ -304,7 +304,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
isNamespace = false),
df.schema.asNullable,
partitioningColumns.getOrElse(Nil).asTransforms.toSeq,
tableProperties,
tableSpec,
ignoreIfExists = false)
Dataset.ofRows(df.sparkSession, cmd)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,44 @@ class DataSourceV2SQLSuite
}
}

test("SPARK-36850: CreateTableAsSelect partitions can be specified using " +
"PARTITIONED BY and/or CLUSTERED BY") {
val identifier = "testcat.table_name"
withTable(identifier) {
spark.sql(s"CREATE TABLE $identifier USING foo PARTITIONED BY (id) " +
s"CLUSTERED BY (data) INTO 4 BUCKETS AS SELECT * FROM source")
val describe = spark.sql(s"DESCRIBE $identifier")
val part1 = describe
.filter("col_name = 'Part 0'")
.select("data_type").head.getString(0)
assert(part1 === "id")
val part2 = describe
.filter("col_name = 'Part 1'")
.select("data_type").head.getString(0)
assert(part2 === "bucket(4, data)")
}
}

test("SPARK-36850: ReplaceTableAsSelect partitions can be specified using " +
"PARTITIONED BY and/or CLUSTERED BY") {
val identifier = "testcat.table_name"
withTable(identifier) {
spark.sql(s"CREATE TABLE $identifier USING foo " +
"AS SELECT id FROM source")
spark.sql(s"REPLACE TABLE $identifier USING foo PARTITIONED BY (id) " +
s"CLUSTERED BY (data) INTO 4 BUCKETS AS SELECT * FROM source")
val describe = spark.sql(s"DESCRIBE $identifier")
val part1 = describe
.filter("col_name = 'Part 0'")
.select("data_type").head.getString(0)
assert(part1 === "id")
val part2 = describe
.filter("col_name = 'Part 1'")
.select("data_type").head.getString(0)
assert(part2 === "bucket(4, data)")
}
}

test("SPARK-37545: CreateTableAsSelect should store location as qualified") {
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
Expand Down