Skip to content

Commit 5a92ecc

Browse files
aokolnychyicloud-fan
authored andcommitted
[SPARK-38085][SQL] DataSource V2: Handle DELETE commands for group-based sources
### What changes were proposed in this pull request? This PR contains changes to rewrite DELETE operations for V2 data sources that can replace groups of data (e.g. files, partitions). ### Why are the changes needed? These changes are needed to support row-level operations in Spark per SPIP SPARK-35801. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with tests. Closes #35395 from aokolnychyi/spark-38085. Authored-by: Anton Okolnychyi <aokolnychyi@apple.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 988af33 commit 5a92ecc

File tree

23 files changed

+1407
-25
lines changed

23 files changed

+1407
-25
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ class Analyzer(override val catalogManager: CatalogManager)
319319
ResolveRandomSeed ::
320320
ResolveBinaryArithmetic ::
321321
ResolveUnion ::
322+
RewriteDeleteFromTable ::
322323
typeCoercionRules ++
323324
Seq(ResolveWithCTE) ++
324325
extendedResolutionRules : _*),
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression, Not}
21+
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
22+
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, LogicalPlan, ReplaceData}
23+
import org.apache.spark.sql.connector.catalog.{SupportsDelete, SupportsRowLevelOperations, TruncatableTable}
24+
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
25+
import org.apache.spark.sql.connector.write.RowLevelOperationTable
26+
import org.apache.spark.sql.errors.QueryCompilationErrors
27+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
28+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
29+
30+
/**
31+
* A rule that rewrites DELETE operations using plans that operate on individual or groups of rows.
32+
*
33+
* If a table implements [[SupportsDelete]] and [[SupportsRowLevelOperations]], this rule will
34+
* still rewrite the DELETE operation but the optimizer will check whether this particular DELETE
35+
* statement can be handled by simply passing delete filters to the connector. If so, the optimizer
36+
* will discard the rewritten plan and will allow the data source to delete using filters.
37+
*/
38+
object RewriteDeleteFromTable extends RewriteRowLevelCommand {
39+
40+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
41+
case d @ DeleteFromTable(aliasedTable, cond) if d.resolved =>
42+
EliminateSubqueryAliases(aliasedTable) match {
43+
case DataSourceV2Relation(_: TruncatableTable, _, _, _, _) if cond == TrueLiteral =>
44+
// don't rewrite as the table supports truncation
45+
d
46+
47+
case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _, _) =>
48+
val table = buildOperationTable(t, DELETE, CaseInsensitiveStringMap.empty())
49+
buildReplaceDataPlan(r, table, cond)
50+
51+
case DataSourceV2Relation(_: SupportsDelete, _, _, _, _) =>
52+
// don't rewrite as the table supports deletes only with filters
53+
d
54+
55+
case DataSourceV2Relation(t, _, _, _, _) =>
56+
throw QueryCompilationErrors.tableDoesNotSupportDeletesError(t)
57+
58+
case _ =>
59+
d
60+
}
61+
}
62+
63+
// build a rewrite plan for sources that support replacing groups of data (e.g. files, partitions)
64+
private def buildReplaceDataPlan(
65+
relation: DataSourceV2Relation,
66+
operationTable: RowLevelOperationTable,
67+
cond: Expression): ReplaceData = {
68+
69+
// resolve all required metadata attrs that may be used for grouping data on write
70+
// for instance, JDBC data source may cluster data by shard/host before writing
71+
val metadataAttrs = resolveRequiredMetadataAttrs(relation, operationTable.operation)
72+
73+
// construct a read relation and include all required metadata columns
74+
val readRelation = buildRelationWithAttrs(relation, operationTable, metadataAttrs)
75+
76+
// construct a plan that contains unmatched rows in matched groups that must be carried over
77+
// such rows do not match the condition but have to be copied over as the source can replace
78+
// only groups of rows (e.g. if a source supports replacing files, unmatched rows in matched
79+
// files must be carried over)
80+
// it is safe to negate the condition here as the predicate pushdown for group-based row-level
81+
// operations is handled in a special way
82+
val remainingRowsFilter = Not(EqualNullSafe(cond, TrueLiteral))
83+
val remainingRowsPlan = Filter(remainingRowsFilter, readRelation)
84+
85+
// build a plan to replace read groups in the table
86+
val writeRelation = relation.copy(table = operationTable)
87+
ReplaceData(writeRelation, cond, remainingRowsPlan, relation)
88+
}
89+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId, V2ExpressionUtils}
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.catalyst.rules.Rule
25+
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
26+
import org.apache.spark.sql.connector.write.{RowLevelOperation, RowLevelOperationInfoImpl, RowLevelOperationTable}
27+
import org.apache.spark.sql.connector.write.RowLevelOperation.Command
28+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
29+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
30+
31+
trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
32+
33+
protected def buildOperationTable(
34+
table: SupportsRowLevelOperations,
35+
command: Command,
36+
options: CaseInsensitiveStringMap): RowLevelOperationTable = {
37+
val info = RowLevelOperationInfoImpl(command, options)
38+
val operation = table.newRowLevelOperationBuilder(info).build()
39+
RowLevelOperationTable(table, operation)
40+
}
41+
42+
protected def buildRelationWithAttrs(
43+
relation: DataSourceV2Relation,
44+
table: RowLevelOperationTable,
45+
metadataAttrs: Seq[AttributeReference]): DataSourceV2Relation = {
46+
47+
val attrs = dedupAttrs(relation.output ++ metadataAttrs)
48+
relation.copy(table = table, output = attrs)
49+
}
50+
51+
protected def dedupAttrs(attrs: Seq[AttributeReference]): Seq[AttributeReference] = {
52+
val exprIds = mutable.Set.empty[ExprId]
53+
attrs.flatMap { attr =>
54+
if (exprIds.contains(attr.exprId)) {
55+
None
56+
} else {
57+
exprIds += attr.exprId
58+
Some(attr)
59+
}
60+
}
61+
}
62+
63+
protected def resolveRequiredMetadataAttrs(
64+
relation: DataSourceV2Relation,
65+
operation: RowLevelOperation): Seq[AttributeReference] = {
66+
67+
V2ExpressionUtils.resolveRefs[AttributeReference](
68+
operation.requiredMetadataAttributes,
69+
relation)
70+
}
71+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
1919

2020
import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, ArrayFilter, CaseWhen, EqualNullSafe, Expression, If, In, InSet, LambdaFunction, Literal, MapFilter, Not, Or}
2121
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
22-
import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan, MergeAction, MergeIntoTable, UpdateAction, UpdateStarAction, UpdateTable}
22+
import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan, MergeAction, MergeIntoTable, ReplaceData, UpdateAction, UpdateStarAction, UpdateTable}
2323
import org.apache.spark.sql.catalyst.rules.Rule
2424
import org.apache.spark.sql.catalyst.trees.TreePattern.{INSET, NULL_LITERAL, TRUE_OR_FALSE_LITERAL}
2525
import org.apache.spark.sql.types.BooleanType
@@ -54,6 +54,7 @@ object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] {
5454
_.containsAnyPattern(NULL_LITERAL, TRUE_OR_FALSE_LITERAL, INSET), ruleId) {
5555
case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond))
5656
case j @ Join(_, _, _, Some(cond), _) => j.copy(condition = Some(replaceNullWithFalse(cond)))
57+
case rd @ ReplaceData(_, cond, _, _, _) => rd.copy(condition = replaceNullWithFalse(cond))
5758
case d @ DeleteFromTable(_, cond) => d.copy(condition = replaceNullWithFalse(cond))
5859
case u @ UpdateTable(_, _, Some(cond)) => u.copy(condition = Some(replaceNullWithFalse(cond)))
5960
case m @ MergeIntoTable(_, _, mergeCond, matchedActions, notMatchedActions) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalsInPredicate.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ object SimplifyConditionalsInPredicate extends Rule[LogicalPlan] {
4848
_.containsAnyPattern(CASE_WHEN, IF), ruleId) {
4949
case f @ Filter(cond, _) => f.copy(condition = simplifyConditional(cond))
5050
case j @ Join(_, _, _, Some(cond), _) => j.copy(condition = Some(simplifyConditional(cond)))
51+
case rd @ ReplaceData(_, cond, _, _, _) => rd.copy(condition = simplifyConditional(cond))
5152
case d @ DeleteFromTable(_, cond) => d.copy(condition = simplifyConditional(cond))
5253
case u @ UpdateTable(_, _, Some(cond)) => u.copy(condition = Some(simplifyConditional(cond)))
5354
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
package org.apache.spark.sql.catalyst.planning
1919

2020
import org.apache.spark.internal.Logging
21+
import org.apache.spark.sql.AnalysisException
2122
import org.apache.spark.sql.catalyst.expressions._
2223
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2324
import org.apache.spark.sql.catalyst.optimizer.JoinSelectionHelper
2425
import org.apache.spark.sql.catalyst.plans._
2526
import org.apache.spark.sql.catalyst.plans.logical._
27+
import org.apache.spark.sql.connector.catalog.Table
2628
import org.apache.spark.sql.errors.QueryCompilationErrors
29+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
2730
import org.apache.spark.sql.internal.SQLConf
2831

2932
trait OperationHelper extends AliasHelper with PredicateHelper {
@@ -388,3 +391,51 @@ object ExtractSingleColumnNullAwareAntiJoin extends JoinSelectionHelper with Pre
388391
case _ => None
389392
}
390393
}
394+
395+
/**
396+
* An extractor for row-level commands such as DELETE, UPDATE, MERGE that were rewritten using plans
397+
* that operate on groups of rows.
398+
*
399+
* This class extracts the following entities:
400+
* - the group-based rewrite plan;
401+
* - the condition that defines matching groups;
402+
* - the read relation that can be either [[DataSourceV2Relation]] or [[DataSourceV2ScanRelation]]
403+
* depending on whether the planning has already happened;
404+
*/
405+
object GroupBasedRowLevelOperation {
406+
type ReturnType = (ReplaceData, Expression, LogicalPlan)
407+
408+
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
409+
case rd @ ReplaceData(DataSourceV2Relation(table, _, _, _, _), cond, query, _, _) =>
410+
val readRelation = findReadRelation(table, query)
411+
readRelation.map((rd, cond, _))
412+
413+
case _ =>
414+
None
415+
}
416+
417+
private def findReadRelation(
418+
table: Table,
419+
plan: LogicalPlan): Option[LogicalPlan] = {
420+
421+
val readRelations = plan.collect {
422+
case r: DataSourceV2Relation if r.table eq table => r
423+
case r: DataSourceV2ScanRelation if r.relation.table eq table => r
424+
}
425+
426+
// in some cases, the optimizer replaces the v2 read relation with a local relation
427+
// for example, there is no reason to query the table if the condition is always false
428+
// that's why it is valid not to find the corresponding v2 read relation
429+
430+
readRelations match {
431+
case relations if relations.isEmpty =>
432+
None
433+
434+
case Seq(relation) =>
435+
Some(relation)
436+
437+
case relations =>
438+
throw new AnalysisException(s"Expected only one row-level read relation: $relations")
439+
}
440+
}
441+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20-
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
20+
import org.apache.spark.sql.{sources, AnalysisException}
21+
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
2122
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2223
import org.apache.spark.sql.catalyst.catalog.FunctionResource
23-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, Unevaluable}
2425
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
2526
import org.apache.spark.sql.catalyst.trees.BinaryLike
2627
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
2728
import org.apache.spark.sql.connector.catalog._
2829
import org.apache.spark.sql.connector.expressions.Transform
29-
import org.apache.spark.sql.connector.write.Write
30+
import org.apache.spark.sql.connector.write.{RowLevelOperation, RowLevelOperationTable, Write}
31+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3032
import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}
3133

3234
/**
@@ -176,6 +178,80 @@ object OverwritePartitionsDynamic {
176178
}
177179
}
178180

181+
trait RowLevelWrite extends V2WriteCommand with SupportsSubquery {
182+
def operation: RowLevelOperation
183+
def condition: Expression
184+
def originalTable: NamedRelation
185+
}
186+
187+
/**
188+
* Replace groups of data in an existing table during a row-level operation.
189+
*
190+
* This node is constructed in rules that rewrite DELETE, UPDATE, MERGE operations for data sources
191+
* that can replace groups of data (e.g. files, partitions).
192+
*
193+
* @param table a plan that references a row-level operation table
194+
* @param condition a condition that defines matching groups
195+
* @param query a query with records that should replace the records that were read
196+
* @param originalTable a plan for the original table for which the row-level command was triggered
197+
* @param write a logical write, if already constructed
198+
*/
199+
case class ReplaceData(
200+
table: NamedRelation,
201+
condition: Expression,
202+
query: LogicalPlan,
203+
originalTable: NamedRelation,
204+
write: Option[Write] = None) extends RowLevelWrite {
205+
206+
override val isByName: Boolean = false
207+
override val stringArgs: Iterator[Any] = Iterator(table, query, write)
208+
209+
override lazy val references: AttributeSet = query.outputSet
210+
211+
lazy val operation: RowLevelOperation = {
212+
EliminateSubqueryAliases(table) match {
213+
case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
214+
operation
215+
case _ =>
216+
throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
217+
}
218+
}
219+
220+
// the incoming query may include metadata columns
221+
lazy val dataInput: Seq[Attribute] = {
222+
query.output.filter {
223+
case MetadataAttribute(_) => false
224+
case _ => true
225+
}
226+
}
227+
228+
override def outputResolved: Boolean = {
229+
assert(table.resolved && query.resolved,
230+
"`outputResolved` can only be called when `table` and `query` are both resolved.")
231+
232+
// take into account only incoming data columns and ignore metadata columns in the query
233+
// they will be discarded after the logical write is built in the optimizer
234+
// metadata columns may be needed to request a correct distribution or ordering
235+
// but are not passed back to the data source during writes
236+
237+
table.skipSchemaResolution || (dataInput.size == table.output.size &&
238+
dataInput.zip(table.output).forall { case (inAttr, outAttr) =>
239+
val outType = CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
240+
// names and types must match, nullability must be compatible
241+
inAttr.name == outAttr.name &&
242+
DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outType) &&
243+
(outAttr.nullable || !inAttr.nullable)
244+
})
245+
}
246+
247+
override def withNewQuery(newQuery: LogicalPlan): ReplaceData = copy(query = newQuery)
248+
249+
override def withNewTable(newTable: NamedRelation): ReplaceData = copy(table = newTable)
250+
251+
override protected def withNewChildInternal(newChild: LogicalPlan): ReplaceData = {
252+
copy(query = newChild)
253+
}
254+
}
179255

180256
/** A trait used for logical plan nodes that create or replace V2 table definitions. */
181257
trait V2CreateTablePlan extends LogicalPlan {
@@ -457,6 +533,16 @@ case class DeleteFromTable(
457533
copy(table = newChild)
458534
}
459535

536+
/**
537+
* The logical plan of the DELETE FROM command that can be executed using data source filters.
538+
*
539+
* As opposed to [[DeleteFromTable]], this node represents a DELETE operation where the condition
540+
* was converted into filters and the data source reported that it can handle all of them.
541+
*/
542+
case class DeleteFromTableWithFilters(
543+
table: LogicalPlan,
544+
condition: Seq[sources.Filter]) extends LeafCommand
545+
460546
/**
461547
* The logical plan of the UPDATE TABLE command.
462548
*/

0 commit comments

Comments
 (0)