Skip to content

Commit 443904a

Browse files
rdbluejzhuge
authored andcommitted
[SPARK-27845][SQL] DataSourceV2: InsertTable
## What changes were proposed in this pull request? Support multiple catalogs in the following InsertTable use cases: - INSERT INTO [TABLE] catalog.db.tbl - INSERT OVERWRITE TABLE catalog.db.tbl Support matrix: Overwrite|Partitioned Table|Partition Clause |Partition Overwrite Mode|Action ---------|-----------------|-----------------|------------------------|----- false|*|*|*|AppendData true|no|(empty)|*|OverwriteByExpression(true) true|yes|p1,p2 or p1 or p2 or (empty)|STATIC|OverwriteByExpression(true) true|yes|p2,p2 or p1 or p2 or (empty)|DYNAMIC|OverwritePartitionsDynamic true|yes|p1=23,p2=3|*|OverwriteByExpression(p1=23 and p2=3) true|yes|p1=23,p2 or p1=23|STATIC|OverwriteByExpression(p1=23) true|yes|p1=23,p2 or p1=23|DYNAMIC|OverwritePartitionsDynamic Notes: - Assume the partitioned table has 2 partitions: p1 and p2. - `STATIC` is the default Partition Overwrite Mode for data source tables. - DSv2 tables currently do not support `IfPartitionNotExists`. ## How was this patch tested? New tests. All existing catalyst and sql/core tests. Closes #24832 from jzhuge/SPARK-27845-pr. Lead-authored-by: Ryan Blue <blue@apache.org> Co-authored-by: John Zhuge <jzhuge@apache.org> Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
1 parent dbd0a2a commit 443904a

File tree

11 files changed

+738
-76
lines changed

11 files changed

+738
-76
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,8 +294,8 @@ query
294294
;
295295

296296
insertInto
297-
: INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable
298-
| INSERT INTO TABLE? tableIdentifier partitionSpec? #insertIntoTable
297+
: INSERT OVERWRITE TABLE? multipartIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable
298+
| INSERT INTO TABLE? multipartIdentifier partitionSpec? (IF NOT EXISTS)? #insertIntoTable
299299
| INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir
300300
| INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir
301301
;

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import scala.util.Random
2525

2626
import org.apache.spark.sql.AnalysisException
2727
import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog, TableChange}
28+
import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform}
29+
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util.loadTable
2830
import org.apache.spark.sql.catalyst._
2931
import org.apache.spark.sql.catalyst.catalog._
3032
import org.apache.spark.sql.catalyst.encoders.OuterScopes
@@ -34,12 +36,14 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
3436
import org.apache.spark.sql.catalyst.expressions.objects._
3537
import org.apache.spark.sql.catalyst.plans._
3638
import org.apache.spark.sql.catalyst.plans.logical._
37-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement}
39+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, InsertIntoStatement}
3840
import org.apache.spark.sql.catalyst.rules._
3941
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
4042
import org.apache.spark.sql.catalyst.util.toPrettySQL
4143
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
4244
import org.apache.spark.sql.internal.SQLConf
45+
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
46+
import org.apache.spark.sql.sources.v2.Table
4347
import org.apache.spark.sql.types._
4448

4549
/**
@@ -167,6 +171,7 @@ class Analyzer(
167171
Batch("Resolution", fixedPoint,
168172
ResolveTableValuedFunctions ::
169173
ResolveAlterTable ::
174+
ResolveInsertInto ::
170175
ResolveTables ::
171176
ResolveRelations ::
172177
ResolveReferences ::
@@ -757,6 +762,136 @@ class Analyzer(
757762
}
758763
}
759764

765+
object ResolveInsertInto extends Rule[LogicalPlan] {
766+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
767+
case i @ InsertIntoStatement(
768+
UnresolvedRelation(CatalogObjectIdentifier(Some(tableCatalog), ident)), _, _, _, _)
769+
if i.query.resolved =>
770+
loadTable(tableCatalog, ident)
771+
.map(DataSourceV2Relation.create)
772+
.map(relation => {
773+
// ifPartitionNotExists is append with validation, but validation is not supported
774+
if (i.ifPartitionNotExists) {
775+
throw new AnalysisException(
776+
s"Cannot write, IF NOT EXISTS is not supported for table: ${relation.table.name}")
777+
}
778+
779+
val partCols = partitionColumnNames(relation.table)
780+
validatePartitionSpec(partCols, i.partitionSpec)
781+
782+
val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
783+
val query = addStaticPartitionColumns(relation, i.query, staticPartitions)
784+
val dynamicPartitionOverwrite = partCols.size > staticPartitions.size &&
785+
conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
786+
787+
if (!i.overwrite) {
788+
AppendData.byPosition(relation, query)
789+
} else if (dynamicPartitionOverwrite) {
790+
OverwritePartitionsDynamic.byPosition(relation, query)
791+
} else {
792+
OverwriteByExpression.byPosition(
793+
relation, query, staticDeleteExpression(relation, staticPartitions))
794+
}
795+
})
796+
.getOrElse(i)
797+
798+
case i @ InsertIntoStatement(UnresolvedRelation(AsTableIdentifier(_)), _, _, _, _)
799+
if i.query.resolved =>
800+
InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, i.ifPartitionNotExists)
801+
}
802+
803+
private def partitionColumnNames(table: Table): Seq[String] = {
804+
// get partition column names. in v2, partition columns are columns that are stored using an
805+
// identity partition transform because the partition values and the column values are
806+
// identical. otherwise, partition values are produced by transforming one or more source
807+
// columns and cannot be set directly in a query's PARTITION clause.
808+
table.partitioning.flatMap {
809+
case IdentityTransform(FieldReference(Seq(name))) => Some(name)
810+
case _ => None
811+
}
812+
}
813+
814+
private def validatePartitionSpec(
815+
partitionColumnNames: Seq[String],
816+
partitionSpec: Map[String, Option[String]]): Unit = {
817+
// check that each partition name is a partition column. otherwise, it is not valid
818+
partitionSpec.keySet.foreach { partitionName =>
819+
partitionColumnNames.find(name => conf.resolver(name, partitionName)) match {
820+
case Some(_) =>
821+
case None =>
822+
throw new AnalysisException(
823+
s"PARTITION clause cannot contain a non-partition column name: $partitionName")
824+
}
825+
}
826+
}
827+
828+
private def addStaticPartitionColumns(
829+
relation: DataSourceV2Relation,
830+
query: LogicalPlan,
831+
staticPartitions: Map[String, String]): LogicalPlan = {
832+
833+
if (staticPartitions.isEmpty) {
834+
query
835+
836+
} else {
837+
// add any static value as a literal column
838+
val withStaticPartitionValues = {
839+
// for each static name, find the column name it will replace and check for unknowns.
840+
val outputNameToStaticName = staticPartitions.keySet.map(staticName =>
841+
relation.output.find(col => conf.resolver(col.name, staticName)) match {
842+
case Some(attr) =>
843+
attr.name -> staticName
844+
case _ =>
845+
throw new AnalysisException(
846+
s"Cannot add static value for unknown column: $staticName")
847+
}).toMap
848+
849+
val queryColumns = query.output.iterator
850+
851+
// for each output column, add the static value as a literal, or use the next input
852+
// column. this does not fail if input columns are exhausted and adds remaining columns
853+
// at the end. both cases will be caught by ResolveOutputRelation and will fail the
854+
// query with a helpful error message.
855+
relation.output.flatMap { col =>
856+
outputNameToStaticName.get(col.name).flatMap(staticPartitions.get) match {
857+
case Some(staticValue) =>
858+
Some(Alias(Cast(Literal(staticValue), col.dataType), col.name)())
859+
case _ if queryColumns.hasNext =>
860+
Some(queryColumns.next)
861+
case _ =>
862+
None
863+
}
864+
} ++ queryColumns
865+
}
866+
867+
Project(withStaticPartitionValues, query)
868+
}
869+
}
870+
871+
private def staticDeleteExpression(
872+
relation: DataSourceV2Relation,
873+
staticPartitions: Map[String, String]): Expression = {
874+
if (staticPartitions.isEmpty) {
875+
Literal(true)
876+
} else {
877+
staticPartitions.map { case (name, value) =>
878+
relation.output.find(col => conf.resolver(col.name, name)) match {
879+
case Some(attr) =>
880+
// the delete expression must reference the table's column names, but these attributes
881+
// are not available when CheckAnalysis runs because the relation is not a child of
882+
// the logical operation. instead, expressions are resolved after
883+
// ResolveOutputRelation runs, using the query's column names that will match the
884+
// table names at that point. because resolution happens after a future rule, create
885+
// an UnresolvedAttribute.
886+
EqualTo(UnresolvedAttribute(attr.name), Cast(Literal(value), attr.dataType))
887+
case None =>
888+
throw new AnalysisException(s"Unknown static partition column: $name")
889+
}
890+
}.reduce(And)
891+
}
892+
}
893+
}
894+
760895
/**
761896
* Resolve ALTER TABLE statements that use a DSv2 catalog.
762897
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
2929
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
3030
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
3131
import org.apache.spark.sql.catalyst.plans.logical._
32+
import org.apache.spark.sql.catalyst.plans.logical.sql._
3233
import org.apache.spark.sql.types._
3334

3435
/**
@@ -379,10 +380,14 @@ package object dsl {
379380
Generate(generator, unrequiredChildIndex, outer,
380381
alias, outputNames.map(UnresolvedAttribute(_)), logicalPlan)
381382

382-
def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
383-
InsertIntoTable(
384-
analysis.UnresolvedRelation(TableIdentifier(tableName)),
385-
Map.empty, logicalPlan, overwrite, ifPartitionNotExists = false)
383+
def insertInto(tableName: String): LogicalPlan = insertInto(table(tableName))
384+
385+
def insertInto(
386+
table: LogicalPlan,
387+
partition: Map[String, Option[String]] = Map.empty,
388+
overwrite: Boolean = false,
389+
ifPartitionNotExists: Boolean = false): LogicalPlan =
390+
InsertIntoStatement(table, partition, logicalPlan, overwrite, ifPartitionNotExists)
386391

387392
def as(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan)
388393

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
3838
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
3939
import org.apache.spark.sql.catalyst.plans._
4040
import org.apache.spark.sql.catalyst.plans.logical._
41-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
41+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
4242
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
4343
import org.apache.spark.sql.internal.SQLConf
4444
import org.apache.spark.sql.types._
@@ -239,9 +239,9 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
239239

240240
/**
241241
* Parameters used for writing query to a table:
242-
* (tableIdentifier, partitionKeys, exists).
242+
* (multipartIdentifier, partitionKeys, ifPartitionNotExists).
243243
*/
244-
type InsertTableParams = (TableIdentifier, Map[String, Option[String]], Boolean)
244+
type InsertTableParams = (Seq[String], Map[String, Option[String]], Boolean)
245245

246246
/**
247247
* Parameters used for writing query to a directory: (isLocal, CatalogStorageFormat, provider).
@@ -263,11 +263,21 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
263263
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
264264
ctx match {
265265
case table: InsertIntoTableContext =>
266-
val (tableIdent, partitionKeys, exists) = visitInsertIntoTable(table)
267-
InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, false, exists)
266+
val (tableIdent, partition, ifPartitionNotExists) = visitInsertIntoTable(table)
267+
InsertIntoStatement(
268+
UnresolvedRelation(tableIdent),
269+
partition,
270+
query,
271+
overwrite = false,
272+
ifPartitionNotExists)
268273
case table: InsertOverwriteTableContext =>
269-
val (tableIdent, partitionKeys, exists) = visitInsertOverwriteTable(table)
270-
InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, true, exists)
274+
val (tableIdent, partition, ifPartitionNotExists) = visitInsertOverwriteTable(table)
275+
InsertIntoStatement(
276+
UnresolvedRelation(tableIdent),
277+
partition,
278+
query,
279+
overwrite = true,
280+
ifPartitionNotExists)
271281
case dir: InsertOverwriteDirContext =>
272282
val (isLocal, storage, provider) = visitInsertOverwriteDir(dir)
273283
InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
@@ -284,9 +294,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
284294
*/
285295
override def visitInsertIntoTable(
286296
ctx: InsertIntoTableContext): InsertTableParams = withOrigin(ctx) {
287-
val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
297+
val tableIdent = visitMultipartIdentifier(ctx.multipartIdentifier)
288298
val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
289299

300+
if (ctx.EXISTS != null) {
301+
operationNotAllowed("INSERT INTO ... IF NOT EXISTS", ctx)
302+
}
303+
290304
(tableIdent, partitionKeys, false)
291305
}
292306

@@ -296,13 +310,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
296310
override def visitInsertOverwriteTable(
297311
ctx: InsertOverwriteTableContext): InsertTableParams = withOrigin(ctx) {
298312
assert(ctx.OVERWRITE() != null)
299-
val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
313+
val tableIdent = visitMultipartIdentifier(ctx.multipartIdentifier)
300314
val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
301315

302316
val dynamicPartitionKeys: Map[String, Option[String]] = partitionKeys.filter(_._2.isEmpty)
303317
if (ctx.EXISTS != null && dynamicPartitionKeys.nonEmpty) {
304-
throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " +
305-
"partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
318+
operationNotAllowed("IF NOT EXISTS with dynamic partitions: " +
319+
dynamicPartitionKeys.keys.mkString(", "), ctx)
306320
}
307321

308322
(tableIdent, partitionKeys, ctx.EXISTS() != null)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical.sql
19+
20+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
21+
22+
/**
23+
* An INSERT INTO statement, as parsed from SQL.
24+
*
25+
* @param table the logical plan representing the table.
26+
* @param query the logical plan representing data to write to.
27+
* @param overwrite overwrite existing table or partitions.
28+
* @param partitionSpec a map from the partition key to the partition value (optional).
29+
* If the value is missing, dynamic partition insert will be performed.
30+
* As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS` would have
31+
* Map('a' -> Some('1'), 'b' -> Some('2')),
32+
* and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
33+
* would have Map('a' -> Some('1'), 'b' -> None).
34+
* @param ifPartitionNotExists If true, only write if the partition does not exist.
35+
* Only valid for static partitions.
36+
*/
37+
case class InsertIntoStatement(
38+
table: LogicalPlan,
39+
partitionSpec: Map[String, Option[String]],
40+
query: LogicalPlan,
41+
overwrite: Boolean,
42+
ifPartitionNotExists: Boolean) extends ParsedStatement {
43+
44+
require(overwrite || !ifPartitionNotExists,
45+
"IF NOT EXISTS is only valid in INSERT OVERWRITE")
46+
require(partitionSpec.values.forall(_.nonEmpty) || !ifPartitionNotExists,
47+
"IF NOT EXISTS is only valid with static partitions")
48+
49+
override def children: Seq[LogicalPlan] = query :: Nil
50+
}

0 commit comments

Comments
 (0)