diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index d991e7cf7e89..0a142c29a16f 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -113,6 +113,14 @@ statement (AS? query)? #createHiveTable | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier LIKE source=tableIdentifier locationSpec? #createTableLike + | replaceTableHeader ('(' colTypeList ')')? tableProvider + ((OPTIONS options=tablePropertyList) | + (PARTITIONED BY partitioning=transformList) | + bucketSpec | + locationSpec | + (COMMENT comment=STRING) | + (TBLPROPERTIES tableProps=tablePropertyList))* + (AS? query)? #replaceTable | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS (identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze | ALTER TABLE multipartIdentifier @@ -261,6 +269,10 @@ createTableHeader : CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? multipartIdentifier ; +replaceTableHeader + : (CREATE OR)? REPLACE TABLE multipartIdentifier + ; + bucketSpec : CLUSTERED BY identifierList (SORTED BY orderedIdentifierList)? diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java new file mode 100644 index 000000000000..fc055e91a6ac --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import java.util.Map; + +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.sources.v2.StagedTable; +import org.apache.spark.sql.sources.v2.SupportsWrite; +import org.apache.spark.sql.sources.v2.writer.BatchWrite; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * An optional mix-in for implementations of {@link TableCatalog} that support staging creation of + * the a table before committing the table's metadata along with its contents in CREATE TABLE AS + * SELECT or REPLACE TABLE AS SELECT operations. + *
+ * It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS + * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE + * TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first + * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via + * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform + * the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the + * write operation fails, the catalog will have already dropped the table, and the planner cannot + * roll back the dropping of the table. + *
+ * If the catalog implements this plugin, the catalog can implement the methods to "stage" the + * creation and the replacement of a table. After the table's + * {@link BatchWrite#commit(WriterCommitMessage[])} is called, + * {@link StagedTable#commitStagedChanges()} is called, at which point the staged table can + * complete both the data write and the metadata swap operation atomically. + */ +public interface StagingTableCatalog extends TableCatalog { + + /** + * Stage the creation of a table, preparing it to be committed into the metastore. + *
+ * When the table is committed, the contents of any writes performed by the Spark planner are
+ * committed along with the metadata about the table passed into this method's arguments. If the
+ * table exists when this method is called, the method should throw an exception accordingly. If
+ * another process concurrently creates the table before this table's staged changes are
+ * committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}.
+ *
+ * @param ident a table identifier
+ * @param schema the schema of the new table, as a struct type
+ * @param partitions transforms to use for partitioning data in the table
+ * @param properties a string map of table properties
+ * @return metadata for the new table
+ * @throws TableAlreadyExistsException If a table or view already exists for the identifier
+ * @throws UnsupportedOperationException If a requested partition transform is not supported
+ * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
+ */
+ StagedTable stageCreate(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map
+ * When the table is committed, the contents of any writes performed by the Spark planner are
+ * committed along with the metadata about the table passed into this method's arguments. If the
+ * table exists, the metadata and the contents of this table replace the metadata and contents of
+ * the existing table. If a concurrent process commits changes to the table's data or metadata
+ * while the write is being performed but before the staged changes are committed, the catalog
+ * can decide whether to move forward with the table replacement anyways or abort the commit
+ * operation.
+ *
+ * If the table does not exist, committing the staged changes should fail with
+ * {@link NoSuchTableException}. This differs from the semantics of
+ * {@link #stageCreateOrReplace(Identifier, StructType, Transform[], Map)}, which should create
+ * the table in the data source if the table does not exist at the time of committing the
+ * operation.
+ *
+ * @param ident a table identifier
+ * @param schema the schema of the new table, as a struct type
+ * @param partitions transforms to use for partitioning data in the table
+ * @param properties a string map of table properties
+ * @return metadata for the new table
+ * @throws UnsupportedOperationException If a requested partition transform is not supported
+ * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
+ * @throws NoSuchTableException If the table does not exist
+ */
+ StagedTable stageReplace(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map
+ * When the table is committed, the contents of any writes performed by the Spark planner are
+ * committed along with the metadata about the table passed into this method's arguments. If the
+ * table exists, the metadata and the contents of this table replace the metadata and contents of
+ * the existing table. If a concurrent process commits changes to the table's data or metadata
+ * while the write is being performed but before the staged changes are committed, the catalog
+ * can decide whether to move forward with the table replacement anyways or abort the commit
+ * operation.
+ *
+ * If the table does not exist when the changes are committed, the table should be created in the
+ * backing data source. This differs from the expected semantics of
+ * {@link #stageReplace(Identifier, StructType, Transform[], Map)}, which should fail when
+ * the staged changes are committed but the table doesn't exist at commit time.
+ *
+ * @param ident a table identifier
+ * @param schema the schema of the new table, as a struct type
+ * @param partitions transforms to use for partitioning data in the table
+ * @param properties a string map of table properties
+ * @return metadata for the new table
+ * @throws UnsupportedOperationException If a requested partition transform is not supported
+ * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
+ */
+ StagedTable stageCreateOrReplace(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map
+ * This is used to implement atomic CREATE TABLE AS SELECT and REPLACE TABLE AS SELECT queries. The
+ * planner will create one of these via
+ * {@link StagingTableCatalog#stageCreate(Identifier, StructType, Transform[], Map)} or
+ * {@link StagingTableCatalog#stageReplace(Identifier, StructType, Transform[], Map)} to prepare the
+ * table for being written to. This table should usually implement {@link SupportsWrite}. A new
+ * writer will be constructed via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)},
+ * and the write will be committed. The job concludes with a call to {@link #commitStagedChanges()},
+ * at which point implementations are expected to commit the table's metadata into the metastore
+ * along with the data that was written by the writes from the write builder this table created.
+ */
+public interface StagedTable extends Table {
+
+ /**
+ * Finalize the creation or replacement of this table.
+ */
+ void commitStagedChanges();
+
+ /**
+ * Abort the changes that were staged, both in metadata and from temporary outputs of this
+ * table's writers.
+ */
+ void abortStagedChanges();
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala
new file mode 100644
index 000000000000..3036f7c21093
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalog.v2.Identifier
+
+class CannotReplaceMissingTableException(
+ tableIdentifier: Identifier,
+ cause: Option[Throwable] = None)
+ extends AnalysisException(
+ s"Table $tableIdentifier cannot be replaced as it did not exist." +
+ s" Use CREATE OR REPLACE TABLE to create the table.", cause = cause)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index d9f8b9a7203f..e599128f8310 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
+import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -2127,6 +2127,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
(multipartIdentifier, temporary, ifNotExists, ctx.EXTERNAL != null)
}
+ /**
+ * Validate a replace table statement and return the [[TableIdentifier]].
+ */
+ override def visitReplaceTableHeader(
+ ctx: ReplaceTableHeaderContext): TableHeader = withOrigin(ctx) {
+ val multipartIdentifier = ctx.multipartIdentifier.parts.asScala.map(_.getText)
+ (multipartIdentifier, false, false, false)
+ }
+
/**
* Parse a qualified name to a multipart name.
*/
@@ -2294,6 +2303,69 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}
+ /**
+ * Replace a table, returning a [[ReplaceTableStatement]] logical plan.
+ *
+ * Expected format:
+ * {{{
+ * [CREATE OR] REPLACE TABLE [db_name.]table_name
+ * USING table_provider
+ * replace_table_clauses
+ * [[AS] select_statement];
+ *
+ * replace_table_clauses (order insensitive):
+ * [OPTIONS table_property_list]
+ * [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)]
+ * [CLUSTERED BY (col_name, col_name, ...)
+ * [SORTED BY (col_name [ASC|DESC], ...)]
+ * INTO num_buckets BUCKETS
+ * ]
+ * [LOCATION path]
+ * [COMMENT table_comment]
+ * [TBLPROPERTIES (property_name=property_value, ...)]
+ * }}}
+ */
+ override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) {
+ val (table, _, ifNotExists, external) = visitReplaceTableHeader(ctx.replaceTableHeader)
+ if (external) {
+ operationNotAllowed("REPLACE EXTERNAL TABLE ... USING", ctx)
+ }
+
+ checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
+ checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
+ checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
+ checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
+ checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
+ checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
+
+ val schema = Option(ctx.colTypeList()).map(createSchema)
+ val partitioning: Seq[Transform] =
+ Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
+ val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
+ val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
+ val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+
+ val provider = ctx.tableProvider.qualifiedName.getText
+ val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
+ val comment = Option(ctx.comment).map(string)
+ val orCreate = ctx.replaceTableHeader().CREATE() != null
+
+ Option(ctx.query).map(plan) match {
+ case Some(_) if schema.isDefined =>
+ operationNotAllowed(
+ "Schema may not be specified in a Replace Table As Select (RTAS) statement",
+ ctx)
+
+ case Some(query) =>
+ ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties,
+ provider, options, location, comment, orCreate = orCreate)
+
+ case _ =>
+ ReplaceTableStatement(table, schema.getOrElse(new StructType), partitioning,
+ bucketSpec, properties, provider, options, location, comment, orCreate = orCreate)
+ }
+ }
+
/**
* Create a [[DropTableStatement]] command.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 2cb04c9ec70c..2698ba282f96 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -441,6 +441,47 @@ case class CreateTableAsSelect(
}
}
+/**
+ * Replace a table with a v2 catalog.
+ *
+ * If the table does not exist, and orCreate is true, then it will be created.
+ * If the table does not exist, and orCreate is false, then an exception will be thrown.
+ *
+ * The persisted table will have no contents as a result of this operation.
+ */
+case class ReplaceTable(
+ catalog: TableCatalog,
+ tableName: Identifier,
+ tableSchema: StructType,
+ partitioning: Seq[Transform],
+ properties: Map[String, String],
+ orCreate: Boolean) extends Command
+
+/**
+ * Replaces a table from a select query with a v2 catalog.
+ *
+ * If the table does not exist, and orCreate is true, then it will be created.
+ * If the table does not exist, and orCreate is false, then an exception will be thrown.
+ */
+case class ReplaceTableAsSelect(
+ catalog: TableCatalog,
+ tableName: Identifier,
+ partitioning: Seq[Transform],
+ query: LogicalPlan,
+ properties: Map[String, String],
+ writeOptions: Map[String, String],
+ orCreate: Boolean) extends Command {
+
+ override def children: Seq[LogicalPlan] = Seq(query)
+
+ override lazy val resolved: Boolean = {
+ // the table schema is created from the query schema, so the only resolution needed is to check
+ // that the columns referenced by the table's partitioning exist in the query schema
+ val references = partitioning.flatMap(_.references).toSet
+ references.map(_.fieldNames).forall(query.schema.findNestedField(_).isDefined)
+ }
+}
+
/**
* Append data to an existing table.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ReplaceTableStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ReplaceTableStatement.scala
new file mode 100644
index 000000000000..2808892b089b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ReplaceTableStatement.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.sql
+
+import org.apache.spark.sql.catalog.v2.expressions.Transform
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A REPLACE TABLE command, as parsed from SQL.
+ *
+ * If the table exists prior to running this command, executing this statement
+ * will replace the table's metadata and clear the underlying rows from the table.
+ */
+case class ReplaceTableStatement(
+ tableName: Seq[String],
+ tableSchema: StructType,
+ partitioning: Seq[Transform],
+ bucketSpec: Option[BucketSpec],
+ properties: Map[String, String],
+ provider: String,
+ options: Map[String, String],
+ location: Option[String],
+ comment: Option[String],
+ orCreate: Boolean) extends ParsedStatement
+
+/**
+ * A REPLACE TABLE AS SELECT command, as parsed from SQL.
+ */
+case class ReplaceTableAsSelectStatement(
+ tableName: Seq[String],
+ asSelect: LogicalPlan,
+ partitioning: Seq[Transform],
+ bucketSpec: Option[BucketSpec],
+ properties: Map[String, String],
+ provider: String,
+ options: Map[String, String],
+ location: Option[String],
+ comment: Option[String],
+ orCreate: Boolean) extends ParsedStatement {
+
+ override def children: Seq[LogicalPlan] = Seq(asSelect)
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index d008b3c78fac..dd84170e2620 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -19,11 +19,11 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale
-import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
+import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.catalyst.analysis.AnalysisTest
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
+import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
@@ -47,82 +47,71 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsePlan(sql), expected, checkAnalysis = false)
}
- test("create table using - schema") {
- val sql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
-
- parsePlan(sql) match {
- case create: CreateTableStatement =>
- assert(create.tableName == Seq("my_tab"))
- assert(create.tableSchema == new StructType()
- .add("a", IntegerType, nullable = true, "test")
- .add("b", StringType))
- assert(create.partitioning.isEmpty)
- assert(create.bucketSpec.isEmpty)
- assert(create.properties.isEmpty)
- assert(create.provider == "parquet")
- assert(create.options.isEmpty)
- assert(create.location.isEmpty)
- assert(create.comment.isEmpty)
- assert(!create.ifNotExists)
-
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $sql")
+ test("create/replace table using - schema") {
+ val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
+ val replaceSql = "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
+ val expectedTableSpec = TableSpec(
+ Seq("my_tab"),
+ Some(new StructType()
+ .add("a", IntegerType, nullable = true, "test")
+ .add("b", StringType)),
+ Seq.empty[Transform],
+ None,
+ Map.empty[String, String],
+ "parquet",
+ Map.empty[String, String],
+ None,
+ None)
+
+ Seq(createSql, replaceSql).foreach { sql =>
+ testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING) USING parquet",
"no viable alternative at input")
}
- test("create table - with IF NOT EXISTS") {
+ test("create/replace table - with IF NOT EXISTS") {
val sql = "CREATE TABLE IF NOT EXISTS my_tab(a INT, b STRING) USING parquet"
-
- parsePlan(sql) match {
- case create: CreateTableStatement =>
- assert(create.tableName == Seq("my_tab"))
- assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType))
- assert(create.partitioning.isEmpty)
- assert(create.bucketSpec.isEmpty)
- assert(create.properties.isEmpty)
- assert(create.provider == "parquet")
- assert(create.options.isEmpty)
- assert(create.location.isEmpty)
- assert(create.comment.isEmpty)
- assert(create.ifNotExists)
-
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $sql")
- }
+ testCreateOrReplaceDdl(
+ sql,
+ TableSpec(
+ Seq("my_tab"),
+ Some(new StructType().add("a", IntegerType).add("b", StringType)),
+ Seq.empty[Transform],
+ None,
+ Map.empty[String, String],
+ "parquet",
+ Map.empty[String, String],
+ None,
+ None),
+ expectedIfNotExists = true)
}
- test("create table - with partitioned by") {
- val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
+ test("create/replace table - with partitioned by") {
+ val createSql = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
"USING parquet PARTITIONED BY (a)"
-
- parsePlan(query) match {
- case create: CreateTableStatement =>
- assert(create.tableName == Seq("my_tab"))
- assert(create.tableSchema == new StructType()
- .add("a", IntegerType, nullable = true, "test")
- .add("b", StringType))
- assert(create.partitioning == Seq(IdentityTransform(FieldReference("a"))))
- assert(create.bucketSpec.isEmpty)
- assert(create.properties.isEmpty)
- assert(create.provider == "parquet")
- assert(create.options.isEmpty)
- assert(create.location.isEmpty)
- assert(create.comment.isEmpty)
- assert(!create.ifNotExists)
-
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $query")
+ val replaceSql = "REPLACE TABLE my_tab(a INT comment 'test', b STRING) " +
+ "USING parquet PARTITIONED BY (a)"
+ val expectedTableSpec = TableSpec(
+ Seq("my_tab"),
+ Some(new StructType()
+ .add("a", IntegerType, nullable = true, "test")
+ .add("b", StringType)),
+ Seq(IdentityTransform(FieldReference("a"))),
+ None,
+ Map.empty[String, String],
+ "parquet",
+ Map.empty[String, String],
+ None,
+ None)
+ Seq(createSql, replaceSql).foreach { sql =>
+ testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
- test("create table - partitioned by transforms") {
- val sql =
+ test("create/replace table - partitioned by transforms") {
+ val createSql =
"""
|CREATE TABLE my_tab (a INT, b STRING, ts TIMESTAMP) USING parquet
|PARTITIONED BY (
@@ -135,154 +124,151 @@ class DDLParserSuite extends AnalysisTest {
| foo(a, "bar", 34))
""".stripMargin
- parsePlan(sql) match {
- case create: CreateTableStatement =>
- assert(create.tableName == Seq("my_tab"))
- assert(create.tableSchema == new StructType()
- .add("a", IntegerType)
- .add("b", StringType)
- .add("ts", TimestampType))
- assert(create.partitioning == Seq(
- IdentityTransform(FieldReference("a")),
- BucketTransform(LiteralValue(16, IntegerType), Seq(FieldReference("b"))),
- YearsTransform(FieldReference("ts")),
- MonthsTransform(FieldReference("ts")),
- DaysTransform(FieldReference("ts")),
- HoursTransform(FieldReference("ts")),
- ApplyTransform("foo", Seq(
- FieldReference("a"),
- LiteralValue(UTF8String.fromString("bar"), StringType),
- LiteralValue(34, IntegerType)))))
- assert(create.bucketSpec.isEmpty)
- assert(create.properties.isEmpty)
- assert(create.provider == "parquet")
- assert(create.options.isEmpty)
- assert(create.location.isEmpty)
- assert(create.comment.isEmpty)
- assert(!create.ifNotExists)
-
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $sql")
+ val replaceSql =
+ """
+ |REPLACE TABLE my_tab (a INT, b STRING, ts TIMESTAMP) USING parquet
+ |PARTITIONED BY (
+ | a,
+ | bucket(16, b),
+ | years(ts),
+ | months(ts),
+ | days(ts),
+ | hours(ts),
+ | foo(a, "bar", 34))
+ """.stripMargin
+ val expectedTableSpec = TableSpec(
+ Seq("my_tab"),
+ Some(new StructType()
+ .add("a", IntegerType)
+ .add("b", StringType)
+ .add("ts", TimestampType)),
+ Seq(
+ IdentityTransform(FieldReference("a")),
+ BucketTransform(LiteralValue(16, IntegerType), Seq(FieldReference("b"))),
+ YearsTransform(FieldReference("ts")),
+ MonthsTransform(FieldReference("ts")),
+ DaysTransform(FieldReference("ts")),
+ HoursTransform(FieldReference("ts")),
+ ApplyTransform("foo", Seq(
+ FieldReference("a"),
+ LiteralValue(UTF8String.fromString("bar"), StringType),
+ LiteralValue(34, IntegerType)))),
+ None,
+ Map.empty[String, String],
+ "parquet",
+ Map.empty[String, String],
+ None,
+ None)
+ Seq(createSql, replaceSql).foreach { sql =>
+ testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
- test("create table - with bucket") {
- val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
+ test("create/replace table - with bucket") {
+ val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
- parsePlan(query) match {
- case create: CreateTableStatement =>
- assert(create.tableName == Seq("my_tab"))
- assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType))
- assert(create.partitioning.isEmpty)
- assert(create.bucketSpec.contains(BucketSpec(5, Seq("a"), Seq("b"))))
- assert(create.properties.isEmpty)
- assert(create.provider == "parquet")
- assert(create.options.isEmpty)
- assert(create.location.isEmpty)
- assert(create.comment.isEmpty)
- assert(!create.ifNotExists)
-
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $query")
+ val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet " +
+ "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
+
+ val expectedTableSpec = TableSpec(
+ Seq("my_tab"),
+ Some(new StructType().add("a", IntegerType).add("b", StringType)),
+ Seq.empty[Transform],
+ Some(BucketSpec(5, Seq("a"), Seq("b"))),
+ Map.empty[String, String],
+ "parquet",
+ Map.empty[String, String],
+ None,
+ None)
+ Seq(createSql, replaceSql).foreach { sql =>
+ testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
- test("create table - with comment") {
- val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
-
- parsePlan(sql) match {
- case create: CreateTableStatement =>
- assert(create.tableName == Seq("my_tab"))
- assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType))
- assert(create.partitioning.isEmpty)
- assert(create.bucketSpec.isEmpty)
- assert(create.properties.isEmpty)
- assert(create.provider == "parquet")
- assert(create.options.isEmpty)
- assert(create.location.isEmpty)
- assert(create.comment.contains("abc"))
- assert(!create.ifNotExists)
-
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $sql")
+ test("create/replace table - with comment") {
+ val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
+ val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
+ val expectedTableSpec = TableSpec(
+ Seq("my_tab"),
+ Some(new StructType().add("a", IntegerType).add("b", StringType)),
+ Seq.empty[Transform],
+ None,
+ Map.empty[String, String],
+ "parquet",
+ Map.empty[String, String],
+ None,
+ Some("abc"))
+ Seq(createSql, replaceSql).foreach{ sql =>
+ testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
- test("create table - with table properties") {
- val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet TBLPROPERTIES('test' = 'test')"
-
- parsePlan(sql) match {
- case create: CreateTableStatement =>
- assert(create.tableName == Seq("my_tab"))
- assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType))
- assert(create.partitioning.isEmpty)
- assert(create.bucketSpec.isEmpty)
- assert(create.properties == Map("test" -> "test"))
- assert(create.provider == "parquet")
- assert(create.options.isEmpty)
- assert(create.location.isEmpty)
- assert(create.comment.isEmpty)
- assert(!create.ifNotExists)
-
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $sql")
+ test("create/replace table - with table properties") {
+ val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet" +
+ " TBLPROPERTIES('test' = 'test')"
+ val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet" +
+ " TBLPROPERTIES('test' = 'test')"
+ val expectedTableSpec = TableSpec(
+ Seq("my_tab"),
+ Some(new StructType().add("a", IntegerType).add("b", StringType)),
+ Seq.empty[Transform],
+ None,
+ Map("test" -> "test"),
+ "parquet",
+ Map.empty[String, String],
+ None,
+ None)
+ Seq(createSql, replaceSql).foreach { sql =>
+ testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
- test("create table - with location") {
- val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'"
-
- parsePlan(sql) match {
- case create: CreateTableStatement =>
- assert(create.tableName == Seq("my_tab"))
- assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType))
- assert(create.partitioning.isEmpty)
- assert(create.bucketSpec.isEmpty)
- assert(create.properties.isEmpty)
- assert(create.provider == "parquet")
- assert(create.options.isEmpty)
- assert(create.location.contains("/tmp/file"))
- assert(create.comment.isEmpty)
- assert(!create.ifNotExists)
-
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $sql")
+ test("create/replace table - with location") {
+ val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'"
+ val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'"
+ val expectedTableSpec = TableSpec(
+ Seq("my_tab"),
+ Some(new StructType().add("a", IntegerType).add("b", StringType)),
+ Seq.empty[Transform],
+ None,
+ Map.empty[String, String],
+ "parquet",
+ Map.empty[String, String],
+ Some("/tmp/file"),
+ None)
+ Seq(createSql, replaceSql).foreach { sql =>
+ testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
- test("create table - byte length literal table name") {
- val sql = "CREATE TABLE 1m.2g(a INT) USING parquet"
-
- parsePlan(sql) match {
- case create: CreateTableStatement =>
- assert(create.tableName == Seq("1m", "2g"))
- assert(create.tableSchema == new StructType().add("a", IntegerType))
- assert(create.partitioning.isEmpty)
- assert(create.bucketSpec.isEmpty)
- assert(create.properties.isEmpty)
- assert(create.provider == "parquet")
- assert(create.options.isEmpty)
- assert(create.location.isEmpty)
- assert(create.comment.isEmpty)
- assert(!create.ifNotExists)
-
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $sql")
+ test("create/replace table - byte length literal table name") {
+ val createSql = "CREATE TABLE 1m.2g(a INT) USING parquet"
+ val replaceSql = "REPLACE TABLE 1m.2g(a INT) USING parquet"
+ val expectedTableSpec = TableSpec(
+ Seq("1m", "2g"),
+ Some(new StructType().add("a", IntegerType)),
+ Seq.empty[Transform],
+ None,
+ Map.empty[String, String],
+ "parquet",
+ Map.empty[String, String],
+ None,
+ None)
+ Seq(createSql, replaceSql).foreach { sql =>
+ testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
- test("Duplicate clauses - create table") {
+ test("Duplicate clauses - create/replace table") {
def createTableHeader(duplicateClause: String): String = {
s"CREATE TABLE my_tab(a INT, b STRING) USING parquet $duplicateClause $duplicateClause"
}
+ def replaceTableHeader(duplicateClause: String): String = {
+ s"CREATE TABLE my_tab(a INT, b STRING) USING parquet $duplicateClause $duplicateClause"
+ }
+
intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')"),
"Found duplicate clauses: TBLPROPERTIES")
intercept(createTableHeader("LOCATION '/tmp/file'"),
@@ -293,31 +279,44 @@ class DDLParserSuite extends AnalysisTest {
"Found duplicate clauses: CLUSTERED BY")
intercept(createTableHeader("PARTITIONED BY (b)"),
"Found duplicate clauses: PARTITIONED BY")
+
+ intercept(replaceTableHeader("TBLPROPERTIES('test' = 'test2')"),
+ "Found duplicate clauses: TBLPROPERTIES")
+ intercept(replaceTableHeader("LOCATION '/tmp/file'"),
+ "Found duplicate clauses: LOCATION")
+ intercept(replaceTableHeader("COMMENT 'a table'"),
+ "Found duplicate clauses: COMMENT")
+ intercept(replaceTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS"),
+ "Found duplicate clauses: CLUSTERED BY")
+ intercept(replaceTableHeader("PARTITIONED BY (b)"),
+ "Found duplicate clauses: PARTITIONED BY")
}
test("support for other types in OPTIONS") {
- val sql =
+ val createSql =
"""
|CREATE TABLE table_name USING json
|OPTIONS (a 1, b 0.1, c TRUE)
""".stripMargin
-
- parsePlan(sql) match {
- case create: CreateTableStatement =>
- assert(create.tableName == Seq("table_name"))
- assert(create.tableSchema == new StructType)
- assert(create.partitioning.isEmpty)
- assert(create.bucketSpec.isEmpty)
- assert(create.properties.isEmpty)
- assert(create.provider == "json")
- assert(create.options == Map("a" -> "1", "b" -> "0.1", "c" -> "true"))
- assert(create.location.isEmpty)
- assert(create.comment.isEmpty)
- assert(!create.ifNotExists)
-
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
- s"got ${other.getClass.getName}: $sql")
+ val replaceSql =
+ """
+ |REPLACE TABLE table_name USING json
+ |OPTIONS (a 1, b 0.1, c TRUE)
+ """.stripMargin
+ Seq(createSql, replaceSql).foreach { sql =>
+ testCreateOrReplaceDdl(
+ sql,
+ TableSpec(
+ Seq("table_name"),
+ Some(new StructType),
+ Seq.empty[Transform],
+ Option.empty[BucketSpec],
+ Map.empty[String, String],
+ "json",
+ Map("a" -> "1", "b" -> "0.1", "c" -> "true"),
+ None,
+ None),
+ expectedIfNotExists = false)
}
}
@@ -352,27 +351,28 @@ class DDLParserSuite extends AnalysisTest {
|AS SELECT * FROM src
""".stripMargin
- checkParsing(s1)
- checkParsing(s2)
- checkParsing(s3)
-
- def checkParsing(sql: String): Unit = {
- parsePlan(sql) match {
- case create: CreateTableAsSelectStatement =>
- assert(create.tableName == Seq("mydb", "page_view"))
- assert(create.partitioning.isEmpty)
- assert(create.bucketSpec.isEmpty)
- assert(create.properties == Map("p1" -> "v1", "p2" -> "v2"))
- assert(create.provider == "parquet")
- assert(create.options.isEmpty)
- assert(create.location.contains("/user/external/page_view"))
- assert(create.comment.contains("This is the staging page view table"))
- assert(create.ifNotExists)
+ val s4 =
+ """
+ |REPLACE TABLE mydb.page_view
+ |USING parquet
+ |COMMENT 'This is the staging page view table'
+ |LOCATION '/user/external/page_view'
+ |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
+ |AS SELECT * FROM src
+ """.stripMargin
- case other =>
- fail(s"Expected to parse ${classOf[CreateTableAsSelectStatement].getClass.getName} " +
- s"from query, got ${other.getClass.getName}: $sql")
- }
+ val expectedTableSpec = TableSpec(
+ Seq("mydb", "page_view"),
+ None,
+ Seq.empty[Transform],
+ None,
+ Map("p1" -> "v1", "p2" -> "v2"),
+ "parquet",
+ Map.empty[String, String],
+ Some("/user/external/page_view"),
+ Some("This is the staging page view table"))
+ Seq(s1, s2, s3, s4).foreach { sql =>
+ testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = true)
}
}
@@ -403,6 +403,28 @@ class DDLParserSuite extends AnalysisTest {
parseCompare(s"DROP VIEW IF EXISTS view", DropViewStatement(Seq("view"), ifExists = true))
}
+ private def testCreateOrReplaceDdl(
+ sqlStatement: String,
+ tableSpec: TableSpec,
+ expectedIfNotExists: Boolean) {
+ val parsedPlan = parsePlan(sqlStatement)
+ val newTableToken = sqlStatement.split(" ")(0).trim.toUpperCase(Locale.ROOT)
+ parsedPlan match {
+ case create: CreateTableStatement if newTableToken == "CREATE" =>
+ assert(create.ifNotExists == expectedIfNotExists)
+ case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" =>
+ assert(ctas.ifNotExists == expectedIfNotExists)
+ case replace: ReplaceTableStatement if newTableToken == "REPLACE" =>
+ case replace: ReplaceTableAsSelectStatement if newTableToken == "REPLACE" =>
+ case other =>
+ fail("First token in statement does not match the expected parsed plan; CREATE TABLE" +
+ " should create a CreateTableStatement, and REPLACE TABLE should create a" +
+ s" ReplaceTableStatement. Statement: $sqlStatement, plan type:" +
+ s" ${parsedPlan.getClass.getName}.")
+ }
+ assert(TableSpec(parsedPlan) === tableSpec)
+ }
+
// ALTER VIEW view_name SET TBLPROPERTIES ('comment' = new_comment);
// ALTER VIEW view_name UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
test("alter view: alter view properties") {
@@ -593,4 +615,69 @@ class DDLParserSuite extends AnalysisTest {
Seq(Seq("x"), Seq("y"), Seq("a", "b", "c"))))
}
}
+
+ private case class TableSpec(
+ name: Seq[String],
+ schema: Option[StructType],
+ partitioning: Seq[Transform],
+ bucketSpec: Option[BucketSpec],
+ properties: Map[String, String],
+ provider: String,
+ options: Map[String, String],
+ location: Option[String],
+ comment: Option[String])
+
+ private object TableSpec {
+ def apply(plan: LogicalPlan): TableSpec = {
+ plan match {
+ case create: CreateTableStatement =>
+ TableSpec(
+ create.tableName,
+ Some(create.tableSchema),
+ create.partitioning,
+ create.bucketSpec,
+ create.properties,
+ create.provider,
+ create.options,
+ create.location,
+ create.comment)
+ case replace: ReplaceTableStatement =>
+ TableSpec(
+ replace.tableName,
+ Some(replace.tableSchema),
+ replace.partitioning,
+ replace.bucketSpec,
+ replace.properties,
+ replace.provider,
+ replace.options,
+ replace.location,
+ replace.comment)
+ case ctas: CreateTableAsSelectStatement =>
+ TableSpec(
+ ctas.tableName,
+ Some(ctas.asSelect).filter(_.resolved).map(_.schema),
+ ctas.partitioning,
+ ctas.bucketSpec,
+ ctas.properties,
+ ctas.provider,
+ ctas.options,
+ ctas.location,
+ ctas.comment)
+ case rtas: ReplaceTableAsSelectStatement =>
+ TableSpec(
+ rtas.tableName,
+ Some(rtas.asSelect).filter(_.resolved).map(_.schema),
+ rtas.partitioning,
+ rtas.bucketSpec,
+ rtas.properties,
+ rtas.provider,
+ rtas.options,
+ rtas.location,
+ rtas.comment)
+ case other =>
+ fail(s"Expected to parse Create, CTAS, Replace, or RTAS plan" +
+ s" from query, got ${other.getClass.getName}.")
+ }
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
index 1b7bb169b36f..8685d2f7a856 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
@@ -27,8 +27,8 @@ import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation}
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan}
-import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect}
+import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DropTableCommand}
import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation}
@@ -94,6 +94,38 @@ case class DataSourceResolution(
convertCTAS(v2SessionCatalog.asTableCatalog, identifier, create)
}
+ case ReplaceTableStatement(
+ AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties,
+ V1WriteProvider(provider), options, location, comment, orCreate) =>
+ throw new AnalysisException(
+ s"Replacing tables is not supported using the legacy / v1 Spark external catalog" +
+ s" API. Write provider name: $provider, identifier: $table.")
+
+ case ReplaceTableAsSelectStatement(
+ AsTableIdentifier(table), query, partitionCols, bucketSpec, properties,
+ V1WriteProvider(provider), options, location, comment, orCreate) =>
+ throw new AnalysisException(
+ s"Replacing tables is not supported using the legacy / v1 Spark external catalog" +
+ s" API. Write provider name: $provider, identifier: $table.")
+
+ case replace: ReplaceTableStatement =>
+ // the provider was not a v1 source, convert to a v2 plan
+ val CatalogObjectIdentifier(maybeCatalog, identifier) = replace.tableName
+ val catalog = maybeCatalog.orElse(defaultCatalog)
+ .getOrElse(throw new AnalysisException(
+ s"No catalog specified for table ${identifier.quoted} and no default catalog is set"))
+ .asTableCatalog
+ convertReplaceTable(catalog, identifier, replace)
+
+ case rtas: ReplaceTableAsSelectStatement =>
+ // the provider was not a v1 source, convert to a v2 plan
+ val CatalogObjectIdentifier(maybeCatalog, identifier) = rtas.tableName
+ val catalog = maybeCatalog.orElse(defaultCatalog)
+ .getOrElse(throw new AnalysisException(
+ s"No catalog specified for table ${identifier.quoted} and no default catalog is set"))
+ .asTableCatalog
+ convertRTAS(catalog, identifier, rtas)
+
case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) =>
DropTable(catalog.asTableCatalog, ident, ifExists)
@@ -226,6 +258,43 @@ case class DataSourceResolution(
ignoreIfExists = create.ifNotExists)
}
+ private def convertRTAS(
+ catalog: TableCatalog,
+ identifier: Identifier,
+ rtas: ReplaceTableAsSelectStatement): ReplaceTableAsSelect = {
+ // convert the bucket spec and add it as a transform
+ val partitioning = rtas.partitioning ++ rtas.bucketSpec.map(_.asTransform)
+ val properties = convertTableProperties(
+ rtas.properties, rtas.options, rtas.location, rtas.comment, rtas.provider)
+
+ ReplaceTableAsSelect(
+ catalog,
+ identifier,
+ partitioning,
+ rtas.asSelect,
+ properties,
+ writeOptions = rtas.options.filterKeys(_ != "path"),
+ orCreate = rtas.orCreate)
+ }
+
+ private def convertReplaceTable(
+ catalog: TableCatalog,
+ identifier: Identifier,
+ replace: ReplaceTableStatement): ReplaceTable = {
+ // convert the bucket spec and add it as a transform
+ val partitioning = replace.partitioning ++ replace.bucketSpec.map(_.asTransform)
+ val properties = convertTableProperties(
+ replace.properties, replace.options, replace.location, replace.comment, replace.provider)
+
+ ReplaceTable(
+ catalog,
+ identifier,
+ replace.tableSchema,
+ partitioning,
+ properties,
+ orCreate = replace.orCreate)
+ }
+
private def convertTableProperties(
properties: Map[String, String],
options: Map[String, String],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 4f8507da3924..52e289653635 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -21,9 +21,10 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, Strategy}
+import org.apache.spark.sql.catalog.v2.StagingTableCatalog
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
+import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
@@ -165,8 +166,45 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
- CreateTableAsSelectExec(
- catalog, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil
+ catalog match {
+ case staging: StagingTableCatalog =>
+ AtomicCreateTableAsSelectExec(
+ staging, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil
+ case _ =>
+ CreateTableAsSelectExec(
+ catalog, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil
+ }
+
+ case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
+ catalog match {
+ case staging: StagingTableCatalog =>
+ AtomicReplaceTableExec(staging, ident, schema, parts, props, orCreate = orCreate) :: Nil
+ case _ =>
+ ReplaceTableExec(catalog, ident, schema, parts, props, orCreate = orCreate) :: Nil
+ }
+
+ case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
+ val writeOptions = new CaseInsensitiveStringMap(options.asJava)
+ catalog match {
+ case staging: StagingTableCatalog =>
+ AtomicReplaceTableAsSelectExec(
+ staging,
+ ident,
+ parts,
+ planLater(query),
+ props,
+ writeOptions,
+ orCreate = orCreate) :: Nil
+ case _ =>
+ ReplaceTableAsSelectExec(
+ catalog,
+ ident,
+ parts,
+ planLater(query),
+ props,
+ writeOptions,
+ orCreate = orCreate) :: Nil
+ }
case AppendData(r: DataSourceV2Relation, query, _) =>
AppendDataExec(r.table.asWritable, r.options, planLater(query)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
new file mode 100644
index 000000000000..35d86ee2abbb
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalog.v2.{Identifier, StagingTableCatalog, TableCatalog}
+import org.apache.spark.sql.catalog.v2.expressions.Transform
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.sources.v2.StagedTable
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+case class ReplaceTableExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ tableSchema: StructType,
+ partitioning: Seq[Transform],
+ tableProperties: Map[String, String],
+ orCreate: Boolean) extends LeafExecNode {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ if (catalog.tableExists(ident)) {
+ catalog.dropTable(ident)
+ } else if (!orCreate) {
+ throw new CannotReplaceMissingTableException(ident)
+ }
+ catalog.createTable(ident, tableSchema, partitioning.toArray, tableProperties.asJava)
+ sqlContext.sparkContext.parallelize(Seq.empty, 1)
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
+
+case class AtomicReplaceTableExec(
+ catalog: StagingTableCatalog,
+ identifier: Identifier,
+ tableSchema: StructType,
+ partitioning: Seq[Transform],
+ tableProperties: Map[String, String],
+ orCreate: Boolean) extends LeafExecNode {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val staged = if (orCreate) {
+ catalog.stageCreateOrReplace(
+ identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
+ } else if (catalog.tableExists(identifier)) {
+ try {
+ catalog.stageReplace(
+ identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
+ } catch {
+ case e: NoSuchTableException =>
+ throw new CannotReplaceMissingTableException(identifier, Some(e))
+ }
+ } else {
+ throw new CannotReplaceMissingTableException(identifier)
+ }
+ commitOrAbortStagedChanges(staged)
+
+ sqlContext.sparkContext.parallelize(Seq.empty, 1)
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+
+ private def commitOrAbortStagedChanges(staged: StagedTable): Unit = {
+ Utils.tryWithSafeFinallyAndFailureCallbacks({
+ staged.commitStagedChanges()
+ })(catchBlock = {
+ staged.abortStagedChanges()
+ })
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 6c771ea98832..9f644de1929a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -26,15 +26,15 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog}
+import org.apache.spark.sql.catalog.v2.{Identifier, StagingTableCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
-import org.apache.spark.sql.sources.v2.SupportsWrite
+import org.apache.spark.sql.sources.v2.{StagedTable, SupportsWrite}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LongAccumulator, Utils}
@@ -51,11 +51,13 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan)
}
/**
- * Physical plan node for v2 create table as select.
+ * Physical plan node for v2 create table as select when the catalog does not support staging
+ * the table creation.
*
* A new table will be created using the schema of the query, and rows from the query are appended.
- * If either table creation or the append fails, the table will be deleted. This implementation does
- * not provide an atomic CTAS.
+ * If either table creation or the append fails, the table will be deleted. This implementation is
+ * not atomic; for an atomic variant for catalogs that support the appropriate features, see
+ * CreateTableAsSelectStagingExec.
*/
case class CreateTableAsSelectExec(
catalog: TableCatalog,
@@ -78,7 +80,8 @@ case class CreateTableAsSelectExec(
}
Utils.tryWithSafeFinallyAndFailureCallbacks({
- catalog.createTable(ident, query.schema, partitioning.toArray, properties.asJava) match {
+ catalog.createTable(
+ ident, query.schema, partitioning.toArray, properties.asJava) match {
case table: SupportsWrite =>
val batchWrite = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
@@ -89,15 +92,145 @@ case class CreateTableAsSelectExec(
case _ =>
// table does not support writes
- throw new SparkException(s"Table implementation does not support writes: ${ident.quoted}")
+ throw new SparkException(
+ s"Table implementation does not support writes: ${ident.quoted}")
}
+ })(catchBlock = {
+ catalog.dropTable(ident)
+ })
+ }
+}
+/**
+ * Physical plan node for v2 create table as select, when the catalog is determined to support
+ * staging table creation.
+ *
+ * A new table will be created using the schema of the query, and rows from the query are appended.
+ * The CTAS operation is atomic. The creation of the table is staged and the commit of the write
+ * should bundle the commitment of the metadata and the table contents in a single unit. If the
+ * write fails, the table is instructed to roll back all staged changes.
+ */
+case class AtomicCreateTableAsSelectExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: SparkPlan,
+ properties: Map[String, String],
+ writeOptions: CaseInsensitiveStringMap,
+ ifNotExists: Boolean) extends AtomicTableWriteExec {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ if (catalog.tableExists(ident)) {
+ if (ifNotExists) {
+ return sparkContext.parallelize(Seq.empty, 1)
+ }
+
+ throw new TableAlreadyExistsException(ident)
+ }
+ val stagedTable = catalog.stageCreate(
+ ident, query.schema, partitioning.toArray, properties.asJava)
+ writeToStagedTable(stagedTable, writeOptions, ident)
+ }
+}
+
+/**
+ * Physical plan node for v2 replace table as select when the catalog does not support staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from the query are appended.
+ * If the table exists, its contents and schema should be replaced with the schema and the contents
+ * of the query. This is a non-atomic implementation that drops the table and then runs non-atomic
+ * CTAS. For an atomic implementation for catalogs with the appropriate support, see
+ * ReplaceTableAsSelectStagingExec.
+ */
+case class ReplaceTableAsSelectExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: SparkPlan,
+ properties: Map[String, String],
+ writeOptions: CaseInsensitiveStringMap,
+ orCreate: Boolean) extends AtomicTableWriteExec {
+
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ // Note that this operation is potentially unsafe, but these are the strict semantics of
+ // RTAS if the catalog does not support atomic operations.
+ //
+ // There are numerous cases we concede to where the table will be dropped and irrecoverable:
+ //
+ // 1. Creating the new table fails,
+ // 2. Writing to the new table fails,
+ // 3. The table returned by catalog.createTable doesn't support writing.
+ if (catalog.tableExists(ident)) {
+ catalog.dropTable(ident)
+ } else if (!orCreate) {
+ throw new CannotReplaceMissingTableException(ident)
+ }
+ val createdTable = catalog.createTable(
+ ident, query.schema, partitioning.toArray, properties.asJava)
+ Utils.tryWithSafeFinallyAndFailureCallbacks({
+ createdTable match {
+ case table: SupportsWrite =>
+ val batchWrite = table.newWriteBuilder(writeOptions)
+ .withInputDataSchema(query.schema)
+ .withQueryId(UUID.randomUUID().toString)
+ .buildForBatch()
+
+ doWrite(batchWrite)
+
+ case _ =>
+ // table does not support writes
+ throw new SparkException(
+ s"Table implementation does not support writes: ${ident.quoted}")
+ }
})(catchBlock = {
catalog.dropTable(ident)
})
}
}
+/**
+ *
+ * Physical plan node for v2 replace table as select when the catalog supports staging
+ * table replacement.
+ *
+ * A new table will be created using the schema of the query, and rows from the query are appended.
+ * If the table exists, its contents and schema should be replaced with the schema and the contents
+ * of the query. This implementation is atomic. The table replacement is staged, and the commit
+ * operation at the end should perform tne replacement of the table's metadata and contents. If the
+ * write fails, the table is instructed to roll back staged changes and any previously written table
+ * is left untouched.
+ */
+case class AtomicReplaceTableAsSelectExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: SparkPlan,
+ properties: Map[String, String],
+ writeOptions: CaseInsensitiveStringMap,
+ orCreate: Boolean) extends AtomicTableWriteExec {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val staged = if (orCreate) {
+ catalog.stageCreateOrReplace(
+ ident, query.schema, partitioning.toArray, properties.asJava)
+ } else if (catalog.tableExists(ident)) {
+ try {
+ catalog.stageReplace(
+ ident, query.schema, partitioning.toArray, properties.asJava)
+ } catch {
+ case e: NoSuchTableException =>
+ throw new CannotReplaceMissingTableException(ident, Some(e))
+ }
+ } else {
+ throw new CannotReplaceMissingTableException(ident)
+ }
+ writeToStagedTable(staged, writeOptions, ident)
+ }
+}
+
/**
* Physical plan node for append into a v2 table.
*
@@ -330,6 +463,36 @@ object DataWritingSparkTask extends Logging {
}
}
+private[v2] trait AtomicTableWriteExec extends V2TableWriteExec {
+ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper
+
+ protected def writeToStagedTable(
+ stagedTable: StagedTable,
+ writeOptions: CaseInsensitiveStringMap,
+ ident: Identifier): RDD[InternalRow] = {
+ Utils.tryWithSafeFinallyAndFailureCallbacks({
+ stagedTable match {
+ case table: SupportsWrite =>
+ val batchWrite = table.newWriteBuilder(writeOptions)
+ .withInputDataSchema(query.schema)
+ .withQueryId(UUID.randomUUID().toString)
+ .buildForBatch()
+
+ val writtenRows = doWrite(batchWrite)
+ stagedTable.commitStagedChanges()
+ writtenRows
+ case _ =>
+ // Table does not support writes - staged changes are also rolled back below.
+ throw new SparkException(
+ s"Table implementation does not support writes: ${ident.quoted}")
+ }
+ })(catchBlock = {
+ // Failure rolls back the staged writes and metadata changes.
+ stagedTable.abortStagedChanges()
+ })
+ }
+}
+
private[v2] case class DataWritingSparkTaskResult(
numRows: Long,
writerCommitMessage: WriterCommitMessage)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
index c90090aca3d4..c173bdb95370 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalog.v2.Identifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG
@@ -39,6 +39,8 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
before {
spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName)
+ spark.conf.set(
+ "spark.sql.catalog.testcat_atomic", classOf[TestStagingInMemoryCatalog].getName)
spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName)
spark.conf.set(V2_SESSION_CATALOG.key, classOf[TestInMemoryTableCatalog].getName)
@@ -50,6 +52,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
after {
spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables()
+ spark.catalog("testcat_atomic").asInstanceOf[TestInMemoryTableCatalog].clearTables()
spark.catalog("session").asInstanceOf[TestInMemoryTableCatalog].clearTables()
}
@@ -159,20 +162,168 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
}
test("CreateTableAsSelect: use v2 plan because catalog is set") {
+ val basicCatalog = spark.catalog("testcat").asTableCatalog
+ val atomicCatalog = spark.catalog("testcat_atomic").asTableCatalog
+ val basicIdentifier = "testcat.table_name"
+ val atomicIdentifier = "testcat_atomic.table_name"
+
+ Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach {
+ case (catalog, identifier) =>
+ spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source")
+
+ val table = catalog.loadTable(Identifier.of(Array(), "table_name"))
+
+ assert(table.name == identifier)
+ assert(table.partitioning.isEmpty)
+ assert(table.properties == Map("provider" -> "foo").asJava)
+ assert(table.schema == new StructType()
+ .add("id", LongType, nullable = false)
+ .add("data", StringType))
+
+ val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
+ checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
+ }
+ }
+
+ test("ReplaceTableAsSelect: basic v2 implementation.") {
+ val basicCatalog = spark.catalog("testcat").asTableCatalog
+ val atomicCatalog = spark.catalog("testcat_atomic").asTableCatalog
+ val basicIdentifier = "testcat.table_name"
+ val atomicIdentifier = "testcat_atomic.table_name"
+
+ Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach {
+ case (catalog, identifier) =>
+ spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source")
+ val originalTable = catalog.loadTable(Identifier.of(Array(), "table_name"))
+
+ spark.sql(s"REPLACE TABLE $identifier USING foo AS SELECT id FROM source")
+ val replacedTable = catalog.loadTable(Identifier.of(Array(), "table_name"))
+
+ assert(replacedTable != originalTable, "Table should have been replaced.")
+ assert(replacedTable.name == identifier)
+ assert(replacedTable.partitioning.isEmpty)
+ assert(replacedTable.properties == Map("provider" -> "foo").asJava)
+ assert(replacedTable.schema == new StructType()
+ .add("id", LongType, nullable = false))
+
+ val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
+ checkAnswer(
+ spark.internalCreateDataFrame(rdd, replacedTable.schema),
+ spark.table("source").select("id"))
+ }
+ }
+
+ test("ReplaceTableAsSelect: Non-atomic catalog drops the table if the write fails.") {
spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source")
+ val testCatalog = spark.catalog("testcat").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+ assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
+
+ intercept[Exception] {
+ spark.sql("REPLACE TABLE testcat.table_name" +
+ s" USING foo OPTIONS (`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}`=true)" +
+ s" AS SELECT id FROM source")
+ }
+ assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")),
+ "Table should have been dropped as a result of the replace.")
+ }
+
+ test("ReplaceTableAsSelect: Non-atomic catalog drops the table permanently if the" +
+ " subsequent table creation fails.") {
+ spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source")
val testCatalog = spark.catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+ assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
- assert(table.name == "testcat.table_name")
- assert(table.partitioning.isEmpty)
- assert(table.properties == Map("provider" -> "foo").asJava)
- assert(table.schema == new StructType()
- .add("id", LongType, nullable = false)
- .add("data", StringType))
+ intercept[Exception] {
+ spark.sql("REPLACE TABLE testcat.table_name" +
+ s" USING foo" +
+ s" TBLPROPERTIES (`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
+ s" AS SELECT id FROM source")
+ }
- val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
- checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
+ assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")),
+ "Table should have been dropped and failed to be created.")
+ }
+
+ test("ReplaceTableAsSelect: Atomic catalog does not drop the table when replace fails.") {
+ spark.sql("CREATE TABLE testcat_atomic.table_name USING foo AS SELECT id, data FROM source")
+ val testCatalog = spark.catalog("testcat_atomic").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+
+ intercept[Exception] {
+ spark.sql("REPLACE TABLE testcat_atomic.table_name" +
+ s" USING foo OPTIONS (`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}=true)" +
+ s" AS SELECT id FROM source")
+ }
+
+ var maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+ assert(maybeReplacedTable === table, "Table should not have changed.")
+
+ intercept[Exception] {
+ spark.sql("REPLACE TABLE testcat_atomic.table_name" +
+ s" USING foo" +
+ s" TBLPROPERTIES (`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
+ s" AS SELECT id FROM source")
+ }
+
+ maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+ assert(maybeReplacedTable === table, "Table should not have changed.")
+ }
+
+ test("ReplaceTable: Erases the table contents and changes the metadata.") {
+ spark.sql(s"CREATE TABLE testcat.table_name USING $orc2 AS SELECT id, data FROM source")
+
+ val testCatalog = spark.catalog("testcat").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+ assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
+
+ spark.sql("REPLACE TABLE testcat.table_name (id bigint) USING foo")
+ val replaced = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+
+ assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty,
+ "Replaced table should have no rows after committing.")
+ assert(replaced.schema().fields.length === 1,
+ "Replaced table should have new schema.")
+ assert(replaced.schema().fields(0).name === "id",
+ "Replaced table should have new schema.")
+ }
+
+ test("ReplaceTableAsSelect: CREATE OR REPLACE new table has same behavior as CTAS.") {
+ Seq("testcat", "testcat_atomic").foreach { catalog =>
+ spark.sql(s"CREATE TABLE $catalog.created USING $orc2 AS SELECT id, data FROM source")
+ spark.sql(
+ s"CREATE OR REPLACE TABLE $catalog.replaced USING $orc2 AS SELECT id, data FROM source")
+
+ val testCatalog = spark.catalog(catalog).asTableCatalog
+ val createdTable = testCatalog.loadTable(Identifier.of(Array(), "created"))
+ val replacedTable = testCatalog.loadTable(Identifier.of(Array(), "replaced"))
+
+ assert(createdTable.asInstanceOf[InMemoryTable].rows ===
+ replacedTable.asInstanceOf[InMemoryTable].rows)
+ assert(createdTable.schema === replacedTable.schema)
+ }
+ }
+
+ test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table does not exist.") {
+ Seq("testcat", "testcat_atomic").foreach { catalog =>
+ spark.sql(s"CREATE TABLE $catalog.created USING $orc2 AS SELECT id, data FROM source")
+ intercept[CannotReplaceMissingTableException] {
+ spark.sql(s"REPLACE TABLE $catalog.replaced USING $orc2 AS SELECT id, data FROM source")
+ }
+ }
+ }
+
+ test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table is dropped before commit.") {
+ import TestInMemoryTableCatalog._
+ spark.sql(s"CREATE TABLE testcat_atomic.created USING $orc2 AS SELECT id, data FROM source")
+ intercept[CannotReplaceMissingTableException] {
+ spark.sql(s"REPLACE TABLE testcat_atomic.replaced" +
+ s" USING $orc2" +
+ s" TBLPROPERTIES (`$SIMULATE_DROP_BEFORE_REPLACE_PROPERTY`=true)" +
+ s" AS SELECT id, data FROM source")
+ }
}
test("CreateTableAsSelect: use v2 plan and session catalog when provider is v2") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
index 380df7a36596..95398082b580 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
@@ -23,11 +23,11 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable
-import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange}
+import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, StagingTableCatalog, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.types.StructType
@@ -38,7 +38,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
class TestInMemoryTableCatalog extends TableCatalog {
import CatalogV2Implicits._
- private val tables: util.Map[Identifier, InMemoryTable] =
+ protected val tables: util.Map[Identifier, InMemoryTable] =
new ConcurrentHashMap[Identifier, InMemoryTable]()
private var _name: Option[String] = None
@@ -66,11 +66,10 @@ class TestInMemoryTableCatalog extends TableCatalog {
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
-
if (tables.containsKey(ident)) {
throw new TableAlreadyExistsException(ident)
}
-
+ TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
if (partitions.nonEmpty) {
throw new UnsupportedOperationException(
s"Catalog $name: Partitioned tables are not supported")
@@ -104,7 +103,9 @@ class TestInMemoryTableCatalog extends TableCatalog {
}
}
- override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined
+ override def dropTable(ident: Identifier): Boolean = {
+ Option(tables.remove(ident)).isDefined
+ }
def clearTables(): Unit = {
tables.clear()
@@ -114,7 +115,7 @@ class TestInMemoryTableCatalog extends TableCatalog {
/**
* A simple in-memory table. Rows are stored as a buffered group produced by each output task.
*/
-private class InMemoryTable(
+class InMemoryTable(
val name: String,
val schema: StructType,
override val properties: util.Map[String, String])
@@ -155,6 +156,7 @@ private class InMemoryTable(
}
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
+ TestInMemoryTableCatalog.maybeSimulateFailedTableWrite(options)
new WriteBuilder with SupportsTruncate {
private var shouldTruncate: Boolean = false
@@ -196,7 +198,142 @@ private class InMemoryTable(
}
}
-private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable {
+object TestInMemoryTableCatalog {
+ val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
+ val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
+ val SIMULATE_DROP_BEFORE_REPLACE_PROPERTY = "spark.sql.test.simulateDropBeforeReplace"
+
+ def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = {
+ if ("true".equalsIgnoreCase(
+ tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) {
+ throw new IllegalStateException("Manual create table failure.")
+ }
+ }
+
+ def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): Unit = {
+ if (tableOptions.getBoolean(
+ TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) {
+ throw new IllegalStateException("Manual write to table failure.")
+ }
+ }
+}
+
+class TestStagingInMemoryCatalog
+ extends TestInMemoryTableCatalog with StagingTableCatalog {
+ import CatalogV2Implicits.IdentifierHelper
+ import org.apache.spark.sql.sources.v2.TestInMemoryTableCatalog._
+
+ override def stageCreate(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ validateStagedTable(partitions, properties)
+ new TestStagedCreateTable(
+ ident,
+ new InMemoryTable(s"$name.${ident.quoted}", schema, properties))
+ }
+
+ override def stageReplace(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ validateStagedTable(partitions, properties)
+ new TestStagedReplaceTable(
+ ident,
+ new InMemoryTable(s"$name.${ident.quoted}", schema, properties))
+ }
+
+ override def stageCreateOrReplace(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ validateStagedTable(partitions, properties)
+ new TestStagedCreateOrReplaceTable(
+ ident,
+ new InMemoryTable(s"$name.${ident.quoted}", schema, properties))
+ }
+
+ private def validateStagedTable(
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): Unit = {
+ if (partitions.nonEmpty) {
+ throw new UnsupportedOperationException(
+ s"Catalog $name: Partitioned tables are not supported")
+ }
+
+ maybeSimulateFailedTableCreation(properties)
+ }
+
+ private abstract class TestStagedTable(
+ ident: Identifier,
+ delegateTable: InMemoryTable)
+ extends StagedTable with SupportsWrite with SupportsRead {
+
+ override def abortStagedChanges(): Unit = {}
+
+ override def name(): String = delegateTable.name
+
+ override def schema(): StructType = delegateTable.schema
+
+ override def capabilities(): util.Set[TableCapability] = delegateTable.capabilities
+
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
+ delegateTable.newWriteBuilder(options)
+ }
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+ delegateTable.newScanBuilder(options)
+ }
+ }
+
+ private class TestStagedCreateTable(
+ ident: Identifier,
+ delegateTable: InMemoryTable) extends TestStagedTable(ident, delegateTable) {
+
+ override def commitStagedChanges(): Unit = {
+ val maybePreCommittedTable = tables.putIfAbsent(ident, delegateTable)
+ if (maybePreCommittedTable != null) {
+ throw new TableAlreadyExistsException(
+ s"Table with identifier $ident and name $name was already created.")
+ }
+ }
+ }
+
+ private class TestStagedReplaceTable(
+ ident: Identifier,
+ delegateTable: InMemoryTable) extends TestStagedTable(ident, delegateTable) {
+
+ override def commitStagedChanges(): Unit = {
+ maybeSimulateDropBeforeCommit()
+ val maybePreCommittedTable = tables.replace(ident, delegateTable)
+ if (maybePreCommittedTable == null) {
+ throw new CannotReplaceMissingTableException(ident)
+ }
+ }
+
+ private def maybeSimulateDropBeforeCommit(): Unit = {
+ if ("true".equalsIgnoreCase(
+ delegateTable.properties.get(SIMULATE_DROP_BEFORE_REPLACE_PROPERTY))) {
+ tables.remove(ident)
+ }
+ }
+ }
+
+ private class TestStagedCreateOrReplaceTable(
+ ident: Identifier,
+ delegateTable: InMemoryTable) extends TestStagedTable(ident, delegateTable) {
+
+ override def commitStagedChanges(): Unit = {
+ tables.put(ident, delegateTable)
+ }
+ }
+}
+
+
+class BufferedRows extends WriterCommitMessage with InputPartition with Serializable {
val rows = new mutable.ArrayBuffer[InternalRow]()
}