diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 8910ca86de477..408c97acaa39c 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -571,6 +571,11 @@ ], "sqlState" : "42809" }, + "GENERATED_COLUMN_WITH_DEFAULT_VALUE" : { + "message" : [ + "A column cannot have both a default value and a generation expression but column has default value: () and generation expression: ()." + ] + }, "GRAPHITE_SINK_INVALID_PROTOCOL" : { "message" : [ "Invalid Graphite protocol: ." @@ -1607,6 +1612,11 @@ }, "sqlState" : "0A000" }, + "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN" : { + "message" : [ + "Cannot create generated column with generation expression because ." + ] + }, "UNSUPPORTED_EXPR_FOR_OPERATOR" : { "message" : [ "A query operator contains one or more unsupported expressions. Consider to rewrite it to avoid window functions, aggregate functions, and generator functions in the WHERE clause.", diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 1501e14c6045e..4124e958e3907 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -353,6 +353,7 @@ Below is a list of all the keywords in Spark SQL. |AFTER|non-reserved|non-reserved|non-reserved| |ALL|reserved|non-reserved|reserved| |ALTER|non-reserved|non-reserved|reserved| +|ALWAYS|non-reserved|non-reserved|non-reserved| |ANALYZE|non-reserved|non-reserved|non-reserved| |AND|reserved|non-reserved|reserved| |ANTI|non-reserved|strict-non-reserved|non-reserved| @@ -451,6 +452,7 @@ Below is a list of all the keywords in Spark SQL. |FULL|reserved|strict-non-reserved|reserved| |FUNCTION|non-reserved|non-reserved|reserved| |FUNCTIONS|non-reserved|non-reserved|non-reserved| +|GENERATED|non-reserved|non-reserved|non-reserved| |GLOBAL|non-reserved|non-reserved|reserved| |GRANT|reserved|non-reserved|reserved| |GROUP|reserved|non-reserved|reserved| diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 38f52901aa2a3..6d0862290cf56 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -91,6 +91,7 @@ ADD: 'ADD'; AFTER: 'AFTER'; ALL: 'ALL'; ALTER: 'ALTER'; +ALWAYS: 'ALWAYS'; ANALYZE: 'ANALYZE'; AND: 'AND'; ANTI: 'ANTI'; @@ -189,6 +190,7 @@ FROM: 'FROM'; FULL: 'FULL'; FUNCTION: 'FUNCTION'; FUNCTIONS: 'FUNCTIONS'; +GENERATED: 'GENERATED'; GLOBAL: 'GLOBAL'; GRANT: 'GRANT'; GROUP: 'GROUP'; diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 7c073411188a7..aa5f538bbf6d4 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -1035,9 +1035,14 @@ createOrReplaceTableColType colDefinitionOption : NOT NULL | defaultExpression + | generationExpression | commentSpec ; +generationExpression + : GENERATED ALWAYS AS LEFT_PAREN expression RIGHT_PAREN + ; + complexColTypeList : complexColType (COMMA complexColType)* ; @@ -1183,6 +1188,7 @@ ansiNonReserved : ADD | AFTER | ALTER + | ALWAYS | ANALYZE | ANTI | ANY_VALUE @@ -1252,6 +1258,7 @@ ansiNonReserved | FORMATTED | FUNCTION | FUNCTIONS + | GENERATED | GLOBAL | GROUPING | HOUR @@ -1441,6 +1448,7 @@ nonReserved | AFTER | ALL | ALTER + | ALWAYS | ANALYZE | AND | ANY @@ -1535,6 +1543,7 @@ nonReserved | FROM | FUNCTION | FUNCTIONS + | GENERATED | GLOBAL | GRANT | GROUP diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java index d2c8f25e73904..b191438dbc3ee 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java @@ -33,6 +33,8 @@ * {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and report it in * {@link Table#columns()} by calling the static {@code create} functions of this interface to * create it. + *

+ * A column cannot have both a default value and a generation expression. */ @Evolving public interface Column { @@ -42,7 +44,16 @@ static Column create(String name, DataType dataType) { } static Column create(String name, DataType dataType, boolean nullable) { - return create(name, dataType, nullable, null, null, null); + return create(name, dataType, nullable, null, null); + } + + static Column create( + String name, + DataType dataType, + boolean nullable, + String comment, + String metadataInJSON) { + return new ColumnImpl(name, dataType, nullable, comment, null, null, metadataInJSON); } static Column create( @@ -52,7 +63,18 @@ static Column create( String comment, ColumnDefaultValue defaultValue, String metadataInJSON) { - return new ColumnImpl(name, dataType, nullable, comment, defaultValue, metadataInJSON); + return new ColumnImpl(name, dataType, nullable, comment, defaultValue, null, metadataInJSON); + } + + static Column create( + String name, + DataType dataType, + boolean nullable, + String comment, + String generationExpression, + String metadataInJSON) { + return new ColumnImpl(name, dataType, nullable, comment, null, + generationExpression, metadataInJSON); } /** @@ -82,6 +104,15 @@ static Column create( @Nullable ColumnDefaultValue defaultValue(); + /** + * Returns the generation expression of this table column. Null means no generation expression. + *

+ * The generation expression is stored as spark SQL dialect. It is up to the data source to verify + * expression compatibility and reject writes as necessary. + */ + @Nullable + String generationExpression(); + /** * Returns the column metadata in JSON format. */ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 82622d65205ec..eb442ad38bde5 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -25,7 +25,9 @@ import org.apache.spark.sql.errors.QueryCompilationErrors; import org.apache.spark.sql.types.StructType; +import java.util.Collections; import java.util.Map; +import java.util.Set; /** * Catalog methods for working with Tables. @@ -78,6 +80,11 @@ public interface TableCatalog extends CatalogPlugin { */ String OPTION_PREFIX = "option."; + /** + * @return the set of capabilities for this TableCatalog + */ + default Set capabilities() { return Collections.emptySet(); } + /** * List the tables in a namespace from the catalog. *

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java new file mode 100644 index 0000000000000..84a2a0f76481e --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; + +/** + * Capabilities that can be provided by a {@link TableCatalog} implementation. + *

+ * TableCatalogs use {@link TableCatalog#capabilities()} to return a set of capabilities. Each + * capability signals to Spark that the catalog supports a feature identified by the capability. + * For example, returning {@link #SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS} allows Spark to + * accept {@code GENERATED ALWAYS AS} expressions in {@code CREATE TABLE} statements. + * + * @since 3.4.0 + */ +@Evolving +public enum TableCatalogCapability { + + /** + * Signals that the TableCatalog supports defining generated columns upon table creation in SQL. + *

+ * Without this capability, any create/replace table statements with a generated column defined + * in the table schema will throw an exception during analysis. + *

+ * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)} + *

+ * Generation expression are included in the column definition for APIs like + * {@link TableCatalog#createTable}. + * See {@link Column#generationExpression()}. + */ + SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8957794ad95f2..1c5eac4ce17d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.CurrentOrigin -import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils, ResolveDefaultColumns} +import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, GeneratedColumn, IntervalUtils, ResolveDefaultColumns} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition @@ -3002,6 +3002,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit // Check that no duplicates exist among any CREATE TABLE column options specified. var nullable = true var defaultExpression: Option[DefaultExpressionContext] = None + var generationExpression: Option[GenerationExpressionContext] = None var commentSpec: Option[CommentSpecContext] = None ctx.colDefinitionOption().asScala.foreach { option => if (option.NULL != null) { @@ -3018,6 +3019,13 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit } defaultExpression = Some(expr) } + Option(option.generationExpression()).foreach { expr => + if (generationExpression.isDefined) { + throw QueryParsingErrors.duplicateCreateTableColumnOption( + option, colName.getText, "GENERATED ALWAYS AS") + } + generationExpression = Some(expr) + } Option(option.commentSpec()).foreach { spec => if (commentSpec.isDefined) { throw QueryParsingErrors.duplicateCreateTableColumnOption( @@ -3042,6 +3050,11 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit throw QueryParsingErrors.defaultColumnNotEnabledError(ctx) } } + // Add the 'GENERATED ALWAYS AS expression' clause in the column definition, if any, to the + // column metadata. + generationExpression.map(visitGenerationExpression).foreach { field => + builder.putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY, field) + } val name: String = colName.getText @@ -3100,11 +3113,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit string(visitStringLit(ctx.stringLit)) } - /** - * Create a default string. - */ - override def visitDefaultExpression(ctx: DefaultExpressionContext): String = withOrigin(ctx) { - val exprCtx = ctx.expression() + private def verifyAndGetExpression(exprCtx: ExpressionContext): String = { // Make sure it can be converted to Catalyst expressions. expression(exprCtx) // Extract the raw expression text so that we can save the user provided text. We don't @@ -3116,6 +3125,22 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit exprCtx.getStart.getInputStream.getText(new Interval(start, end)) } + /** + * Create a default string. + */ + override def visitDefaultExpression(ctx: DefaultExpressionContext): String = + withOrigin(ctx) { + verifyAndGetExpression(ctx.expression()) + } + + /** + * Create a generation expression string. + */ + override def visitGenerationExpression(ctx: GenerationExpressionContext): String = + withOrigin(ctx) { + verifyAndGetExpression(ctx.expression()) + } + /** * Create an optional comment string. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala new file mode 100644 index 0000000000000..6ff5df98d3ccf --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Expression} +import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project} +import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog +import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog, TableCatalogCapability} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DataType, StructField, StructType} + +/** + * This object contains utility methods and values for Generated Columns + */ +object GeneratedColumn { + + /** + * The metadata key for saving a generation expression in a generated column's metadata. This is + * only used internally and connectors should access generation expressions from the V2 columns. + */ + val GENERATION_EXPRESSION_METADATA_KEY = "GENERATION_EXPRESSION" + + /** Parser for parsing generation expression SQL strings */ + private lazy val parser = new CatalystSqlParser() + + /** + * Whether the given `field` is a generated column + */ + def isGeneratedColumn(field: StructField): Boolean = { + field.metadata.contains(GENERATION_EXPRESSION_METADATA_KEY) + } + + /** + * Returns the generation expression stored in the column metadata if it exists + */ + def getGenerationExpression(field: StructField): Option[String] = { + if (isGeneratedColumn(field)) { + Some(field.metadata.getString(GENERATION_EXPRESSION_METADATA_KEY)) + } else { + None + } + } + + /** + * Whether the `schema` has one or more generated columns + */ + def hasGeneratedColumns(schema: StructType): Boolean = { + schema.exists(isGeneratedColumn) + } + + /** + * Parse and analyze `expressionStr` and perform verification. This means: + * - The expression cannot reference itself + * - The expression cannot reference other generated columns + * - No user-defined expressions + * - The expression must be deterministic + * - The expression data type can be safely up-cast to the destination column data type + * - No subquery expressions + * + * Throws an [[AnalysisException]] if the expression cannot be converted or is an invalid + * generation expression according to the above rules. + */ + private def analyzeAndVerifyExpression( + expressionStr: String, + fieldName: String, + dataType: DataType, + schema: StructType, + statementType: String): Unit = { + def unsupportedExpressionError(reason: String): AnalysisException = { + new AnalysisException( + errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN", + messageParameters = Map( + "fieldName" -> fieldName, + "expressionStr" -> expressionStr, + "reason" -> reason)) + } + + // Parse the expression string + val parsed: Expression = try { + parser.parseExpression(expressionStr) + } catch { + case ex: ParseException => + // Shouldn't be possible since we check that the expression is a valid catalyst expression + // during parsing + throw SparkException.internalError( + s"Failed to execute $statementType command because the column $fieldName has " + + s"generation expression $expressionStr which fails to parse as a valid expression:" + + s"\n${ex.getMessage}") + } + // Don't allow subquery expressions + if (parsed.containsPattern(PLAN_EXPRESSION)) { + throw unsupportedExpressionError("subquery expressions are not allowed for generated columns") + } + // Analyze the parsed result + val allowedBaseColumns = schema + .filterNot(_.name == fieldName) // Can't reference itself + .filterNot(isGeneratedColumn) // Can't reference other generated columns + val relation = new LocalRelation(StructType(allowedBaseColumns).toAttributes) + val plan = try { + val analyzer: Analyzer = GeneratedColumnAnalyzer + val analyzed = analyzer.execute(Project(Seq(Alias(parsed, fieldName)()), relation)) + analyzer.checkAnalysis(analyzed) + analyzed + } catch { + case ex: AnalysisException => + // Improve error message if possible + if (ex.getErrorClass == "UNRESOLVED_COLUMN.WITH_SUGGESTION") { + ex.messageParameters.get("objectName").foreach { unresolvedCol => + val resolver = SQLConf.get.resolver + // Whether `col` = `unresolvedCol` taking into account case-sensitivity + def isUnresolvedCol(col: String) = + resolver(unresolvedCol, QueryCompilationErrors.toSQLId(col)) + // Check whether the unresolved column is this column + if (isUnresolvedCol(fieldName)) { + throw unsupportedExpressionError("generation expression cannot reference itself") + } + // Check whether the unresolved column is another generated column in the schema + if (schema.exists(col => isGeneratedColumn(col) && isUnresolvedCol(col.name))) { + throw unsupportedExpressionError( + "generation expression cannot reference another generated column") + } + } + } + if (ex.getErrorClass == "UNRESOLVED_ROUTINE") { + // Cannot resolve function using built-in catalog + ex.messageParameters.get("routineName").foreach { fnName => + throw unsupportedExpressionError(s"failed to resolve $fnName to a built-in function") + } + } + throw ex + } + val analyzed = plan.collectFirst { + case Project(Seq(a: Alias), _: LocalRelation) => a.child + }.get + if (!analyzed.deterministic) { + throw unsupportedExpressionError("generation expression is not deterministic") + } + if (!Cast.canUpCast(analyzed.dataType, dataType)) { + throw unsupportedExpressionError( + s"generation expression data type ${analyzed.dataType.simpleString} " + + s"is incompatible with column data type ${dataType.simpleString}") + } + } + + /** + * For any generated columns in `schema`, parse, analyze and verify the generation expression. + */ + private def verifyGeneratedColumns(schema: StructType, statementType: String): Unit = { + schema.foreach { field => + getGenerationExpression(field).foreach { expressionStr => + analyzeAndVerifyExpression(expressionStr, field.name, field.dataType, schema, statementType) + } + } + } + + /** + * If `schema` contains any generated columns: + * 1) Check whether the table catalog supports generated columns. Otherwise throw an error. + * 2) Parse, analyze and verify the generation expressions for any generated columns. + */ + def validateGeneratedColumns( + schema: StructType, + catalog: TableCatalog, + ident: Seq[String], + statementType: String): Unit = { + if (hasGeneratedColumns(schema)) { + if (!catalog.capabilities().contains( + TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS)) { + throw QueryCompilationErrors.generatedColumnsUnsupported(ident) + } + GeneratedColumn.verifyGeneratedColumns(schema, statementType) + } + } +} + +/** + * Analyzer for processing generated column expressions using built-in functions only. + */ +object GeneratedColumnAnalyzer extends Analyzer( + new CatalogManager(BuiltInFunctionCatalog, BuiltInFunctionCatalog.v1Catalog)) { +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 9b481356fa603..12a8db9236357 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -22,9 +22,11 @@ import java.util.Collections import scala.collection.JavaConverters._ +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} +import org.apache.spark.sql.catalyst.util.GeneratedColumn import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction @@ -471,43 +473,62 @@ private[sql] object CatalogV2Util { /** * Converts a StructType to DS v2 columns, which decodes the StructField metadata to v2 column - * comment and default value. This is mainly used to generate DS v2 columns from table schema in - * DDL commands, so that Spark can pass DS v2 columns to DS v2 createTable and related APIs. + * comment and default value or generation expression. This is mainly used to generate DS v2 + * columns from table schema in DDL commands, so that Spark can pass DS v2 columns to DS v2 + * createTable and related APIs. */ def structTypeToV2Columns(schema: StructType): Array[Column] = { schema.fields.map(structFieldToV2Column) } private def structFieldToV2Column(f: StructField): Column = { - def createV2Column(defaultValue: ColumnDefaultValue, metadata: Metadata): Column = { - val metadataJSON = if (metadata == Metadata.empty) { + def metadataAsJson(metadata: Metadata): String = { + if (metadata == Metadata.empty) { null } else { metadata.json } - Column.create( - f.name, f.dataType, f.nullable, f.getComment().orNull, defaultValue, metadataJSON) } - if (f.getCurrentDefaultValue().isDefined && f.getExistenceDefaultValue().isDefined) { + def metadataWithKeysRemoved(keys: Seq[String]): Metadata = { + keys.foldLeft(new MetadataBuilder().withMetadata(f.metadata)) { + (builder, key) => builder.remove(key) + }.build() + } + + val isDefaultColumn = f.getCurrentDefaultValue().isDefined && + f.getExistenceDefaultValue().isDefined + val isGeneratedColumn = GeneratedColumn.isGeneratedColumn(f) + if (isDefaultColumn && isGeneratedColumn) { + throw new AnalysisException( + errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE", + messageParameters = Map( + "colName" -> f.name, + "defaultValue" -> f.getCurrentDefaultValue().get, + "genExpr" -> GeneratedColumn.getGenerationExpression(f).get + ) + ) + } + + if (isDefaultColumn) { val e = analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY) assert(e.resolved && e.foldable, "The existence default value must be a simple SQL string that is resolved and foldable, " + "but got: " + f.getExistenceDefaultValue().get) val defaultValue = new ColumnDefaultValue( f.getCurrentDefaultValue().get, LiteralValue(e.eval(), f.dataType)) - val cleanedMetadata = new MetadataBuilder() - .withMetadata(f.metadata) - .remove("comment") - .remove(CURRENT_DEFAULT_COLUMN_METADATA_KEY) - .remove(EXISTS_DEFAULT_COLUMN_METADATA_KEY) - .build() - createV2Column(defaultValue, cleanedMetadata) + val cleanedMetadata = metadataWithKeysRemoved( + Seq("comment", CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, defaultValue, + metadataAsJson(cleanedMetadata)) + } else if (isGeneratedColumn) { + val cleanedMetadata = metadataWithKeysRemoved( + Seq("comment", GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY)) + Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, + GeneratedColumn.getGenerationExpression(f).get, metadataAsJson(cleanedMetadata)) } else { - val cleanedMetadata = new MetadataBuilder() - .withMetadata(f.metadata) - .remove("comment") - .build() - createV2Column(null, cleanedMetadata) + val cleanedMetadata = metadataWithKeysRemoved(Seq("comment")) + Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, + metadataAsJson(cleanedMetadata)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 556b3a62da3d6..1c257966aaf4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3405,6 +3405,16 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } } + def generatedColumnsUnsupported(nameParts: Seq[String]): AnalysisException = { + new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + messageParameters = Map( + "tableName" -> toSQLId(nameParts), + "operation" -> "generated columns" + ) + ) + } + def ambiguousLateralColumnAliasError(name: String, numOfMatches: Int): Throwable = { new AnalysisException( errorClass = "AMBIGUOUS_LATERAL_COLUMN_ALIAS", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala index 5ab3f83eeae56..2a67ffc4bbef5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala @@ -27,4 +27,5 @@ case class ColumnImpl( nullable: Boolean, comment: String, defaultValue: ColumnDefaultValue, + generationExpression: String, metadataInJSON: String) extends Column diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 0efbd75ad93d0..5196d19ffcd9b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkThrowable import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal} import org.apache.spark.sql.catalyst.plans.logical.{TableSpec => LogicalTableSpec, _} -import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns +import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first} import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket @@ -2717,4 +2717,49 @@ class DDLParserSuite extends AnalysisTest { context = ExpectedContext( fragment = "b STRING COMMENT \"abc\" NOT NULL COMMENT \"abc\"", start = 27, stop = 71)) } + + test("SPARK-41290: implement parser support for GENERATED ALWAYS AS columns in tables") { + val schemaWithGeneratedColumn = new StructType() + .add("a", IntegerType, true) + .add("b", IntegerType, false, + new MetadataBuilder() + .putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY, "a+1") + .build()) + comparePlans(parsePlan( + "CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"), + CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn, + Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"), + Map.empty[String, String], None, None, None, false), false)) + comparePlans(parsePlan( + "REPLACE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"), + ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn, + Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"), + Map.empty[String, String], None, None, None, false), false)) + // Two generation expressions + checkError( + exception = parseException("CREATE TABLE my_tab(a INT, " + + "b INT GENERATED ALWAYS AS (a + 1) GENERATED ALWAYS AS (a + 2)) USING PARQUET"), + errorClass = "CREATE_TABLE_COLUMN_OPTION_DUPLICATE", + parameters = Map("columnName" -> "b", "optionName" -> "GENERATED ALWAYS AS"), + context = ExpectedContext( + fragment = "b INT GENERATED ALWAYS AS (a + 1) GENERATED ALWAYS AS (a + 2)", + start = 27, + stop = 87 + ) + ) + // Empty expression + checkError( + exception = parseException( + "CREATE TABLE my_tab(a INT, b INT GENERATED ALWAYS AS ()) USING PARQUET"), + errorClass = "PARSE_SYNTAX_ERROR", + parameters = Map("error" -> "')'", "hint" -> "") + ) + // No parenthesis + checkError( + exception = parseException( + "CREATE TABLE my_tab(a INT, b INT GENERATED ALWAYS AS a + 1) USING PARQUET"), + errorClass = "PARSE_SYNTAX_ERROR", + parameters = Map("error" -> "'a'", "hint" -> ": missing '('") + ) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 50bea2b8d2f27..e82f203742b17 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -170,6 +170,11 @@ class BasicInMemoryTableCatalog extends TableCatalog { } class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces { + + override def capabilities: java.util.Set[TableCatalogCapability] = { + Set(TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS).asJava + } + protected def allNamespaces: Seq[Seq[String]] = { (tables.keySet.asScala.map(_.namespace.toSeq) ++ namespaces.keySet.asScala).toSeq.distinct } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 8b985e82963e8..74539b54117f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 -import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, V2ExpressionBuilder} +import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder} import org.apache.spark.sql.connector.catalog.SupportsRead import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, NullOrdering, SortDirection, SortOrder => V2SortOrder, SortValue} @@ -136,6 +136,13 @@ case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] { val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( tableDesc.schema, tableDesc.provider, "CREATE TABLE", false) + + if (GeneratedColumn.hasGeneratedColumns(newSchema)) { + throw QueryCompilationErrors.generatedColumnsUnsupported( + Seq(tableDesc.identifier.catalog.get, tableDesc.identifier.database.get, + tableDesc.identifier.table)) + } + val newTableDesc = tableDesc.copy(schema = newSchema) CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode == SaveMode.Ignore) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index b45de06371cfe..71ffe65b42a81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.{toPrettySQL, ResolveDefaultColumns, V2ExpressionBuilder} +import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder} import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable} import org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns import org.apache.spark.sql.connector.catalog.index.SupportsIndex @@ -178,6 +178,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( schema, tableSpec.provider, "CREATE TABLE", false) + GeneratedColumn.validateGeneratedColumns( + newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE") + CreateTableExec(catalog.asTableCatalog, ident, structTypeToV2Columns(newSchema), partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil @@ -201,6 +204,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( schema, tableSpec.provider, "CREATE TABLE", false) + GeneratedColumn.validateGeneratedColumns( + newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE") + val v2Columns = structTypeToV2Columns(newSchema) catalog match { case staging: StagingTableCatalog => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index c4dabaec8880b..85984f1b2a835 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1422,6 +1422,228 @@ class DataSourceV2SQLSuiteV1Filter } } + test("SPARK-41290: Generated columns only allowed with TableCatalogs that " + + "SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS") { + val tblName = "my_tab" + val tableDefinition = + s"$tblName(eventDate DATE, eventYear INT GENERATED ALWAYS AS (year(eventDate)))" + for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) { + // InMemoryTableCatalog.capabilities() = {SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS} + withTable(s"testcat.$tblName") { + if (statement == "REPLACE TABLE") { + sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo") + } + // Can create table with a generated column + sql(s"$statement testcat.$tableDefinition USING foo") + assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName))) + } + // BasicInMemoryTableCatalog.capabilities() = {} + withSQLConf("spark.sql.catalog.dummy" -> classOf[BasicInMemoryTableCatalog].getName) { + checkError( + exception = intercept[AnalysisException] { + sql("USE dummy") + sql(s"$statement dummy.$tableDefinition USING foo") + }, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + parameters = Map( + "tableName" -> "`my_tab`", + "operation" -> "generated columns" + ) + ) + } + } + } + + test("SPARK-41290: Column cannot have both a generation expression and a default value") { + val tblName = "my_tab" + val tableDefinition = + s"$tblName(eventDate DATE, eventYear INT GENERATED ALWAYS AS (year(eventDate)) DEFAULT 0)" + withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> "foo") { + for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) { + withTable(s"testcat.$tblName") { + if (statement == "REPLACE TABLE") { + sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo") + } + checkError( + exception = intercept[AnalysisException] { + sql(s"$statement testcat.$tableDefinition USING foo") + }, + errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE", + parameters = Map( + "colName" -> "eventYear", + "defaultValue" -> "0", + "genExpr" -> "year(eventDate)") + ) + } + } + } + } + + test("SPARK-41290: Generated column expression must be valid generation expression") { + val tblName = "my_tab" + def checkUnsupportedGenerationExpression( + expr: String, + expectedReason: String, + genColType: String = "INT", + customTableDef: Option[String] = None): Unit = { + val tableDef = + s"CREATE TABLE testcat.$tblName(a INT, b $genColType GENERATED ALWAYS AS ($expr)) USING foo" + withTable(s"testcat.$tblName") { + checkError( + exception = intercept[AnalysisException] { + sql(customTableDef.getOrElse(tableDef)) + }, + errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN", + parameters = Map( + "fieldName" -> "b", + "expressionStr" -> expr, + "reason" -> expectedReason) + ) + } + } + + // Expression cannot be resolved since it doesn't exist + checkUnsupportedGenerationExpression( + "not_a_function(a)", + "failed to resolve `not_a_function` to a built-in function" + ) + + // Expression cannot be resolved since it's not a built-in function + spark.udf.register("timesTwo", (x: Int) => x * 2) + checkUnsupportedGenerationExpression( + "timesTwo(a)", + "failed to resolve `timesTwo` to a built-in function" + ) + + // Generated column can't reference itself + checkUnsupportedGenerationExpression( + "b + 1", + "generation expression cannot reference itself" + ) + // Obeys case sensitivity when intercepting the error message + // Intercepts when case-insensitive + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + checkUnsupportedGenerationExpression( + "B + 1", + "generation expression cannot reference itself" + ) + } + // Doesn't intercept when case-sensitive + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + withTable(s"testcat.$tblName") { + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE testcat.$tblName(a INT, " + + "b INT GENERATED ALWAYS AS (B + 1)) USING foo") + }, + errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + parameters = Map("objectName" -> "`B`", "proposal" -> "`a`"), + context = ExpectedContext(fragment = "B", start = 0, stop = 0) + ) + } + } + // Respects case sensitivity when resolving + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + withTable(s"testcat.$tblName") { + sql(s"CREATE TABLE testcat.$tblName(" + + "a INT, b INT GENERATED ALWAYS AS (B + 1), B INT) USING foo") + assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName))) + } + } + + // Generated column can't reference other generated columns + checkUnsupportedGenerationExpression( + "c + 1", + "generation expression cannot reference another generated column", + customTableDef = Some( + s"CREATE TABLE testcat.$tblName(a INT, " + + "b INT GENERATED ALWAYS AS (c + 1), c INT GENERATED ALWAYS AS (a + 1)) USING foo" + ) + ) + // Respects case-insensitivity + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + checkUnsupportedGenerationExpression( + "C + 1", + "generation expression cannot reference another generated column", + customTableDef = Some( + s"CREATE TABLE testcat.$tblName(a INT, " + + "b INT GENERATED ALWAYS AS (C + 1), c INT GENERATED ALWAYS AS (a + 1)) USING foo" + ) + ) + checkUnsupportedGenerationExpression( + "c + 1", + "generation expression cannot reference another generated column", + customTableDef = Some( + s"CREATE TABLE testcat.$tblName(a INT, " + + "b INT GENERATED ALWAYS AS (c + 1), C INT GENERATED ALWAYS AS (a + 1)) USING foo" + ) + ) + } + // Respects case sensitivity when resolving + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + withTable(s"testcat.$tblName") { + sql(s"CREATE TABLE testcat.$tblName(" + + "a INT, A INT GENERATED ALWAYS AS (a + 1), b INT GENERATED ALWAYS AS (a + 1)) USING foo") + assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName))) + } + } + + // Generated column can't reference non-existent column + withTable(s"testcat.$tblName") { + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE testcat.$tblName(a INT, b INT GENERATED ALWAYS AS (c + 1)) USING foo") + }, + errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + parameters = Map("objectName" -> "`c`", "proposal" -> "`a`"), + context = ExpectedContext(fragment = "c", start = 0, stop = 0) + ) + } + + // Expression must be deterministic + checkUnsupportedGenerationExpression( + "rand()", + "generation expression is not deterministic" + ) + + // Data type is incompatible + checkUnsupportedGenerationExpression( + "a + 1", + "generation expression data type int is incompatible with column data type boolean", + "BOOLEAN" + ) + // But we allow valid up-casts + withTable(s"testcat.$tblName") { + sql(s"CREATE TABLE testcat.$tblName(a INT, b LONG GENERATED ALWAYS AS (a + 1)) USING foo") + assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName))) + } + + // No subquery expressions + checkUnsupportedGenerationExpression( + "(SELECT 1)", + "subquery expressions are not allowed for generated columns" + ) + checkUnsupportedGenerationExpression( + "(SELECT (SELECT 2) + 1)", // nested + "subquery expressions are not allowed for generated columns" + ) + checkUnsupportedGenerationExpression( + "(SELECT 1) + a", // refers to another column + "subquery expressions are not allowed for generated columns" + ) + withTable("other") { + sql("create table other(x INT) using parquet") + checkUnsupportedGenerationExpression( + "(select min(x) from other)", // refers to another table + "subquery expressions are not allowed for generated columns" + ) + } + checkUnsupportedGenerationExpression( + "(select min(x) from faketable)", // refers to a non-existent table + "subquery expressions are not allowed for generated columns" + ) + } + test("ShowCurrentNamespace: basic tests") { def testShowCurrentNamespace(expectedCatalogName: String, expectedNamespace: String): Unit = { val schema = new StructType() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 3d88d4f7ab9e3..4f3f993d7de2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2181,6 +2181,17 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { assert(spark.sessionState.catalog.isRegisteredFunction(rand)) } } + + test("SPARK-41290: No generated columns with V1") { + checkError( + exception = intercept[AnalysisException] { + sql(s"create table t(a int, b int generated always as (a + 1)) using parquet") + }, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + parameters = Map("tableName" -> "`spark_catalog`.`default`.`t`", + "operation" -> "generated columns") + ) + } } object FakeLocalFsFileSystem {