From 83a77319f03c3b6e4550253f535efd95ad109078 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 23 Nov 2020 10:16:05 -0800 Subject: [PATCH 1/3] [SPARK-33623][SQL] Add canDeleteWhere to SupportsDelete --- .../sql/connector/catalog/SupportsDelete.java | 19 +++++++++++++++++++ .../spark/sql/connector/InMemoryTable.scala | 12 ++++++++++++ .../datasources/v2/DataSourceV2Strategy.scala | 6 ++++++ .../sql/connector/DataSourceV2SQLSuite.scala | 14 ++++++++++++++ 4 files changed, 51 insertions(+) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java index 106f3283a62c8..542820dde86ef 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java @@ -28,6 +28,25 @@ */ @Evolving public interface SupportsDelete { + + /** + * Checks whether it is possible to delete data from a data source table that matches filter + * expressions. + *

+ * Rows should be deleted from the data source iff all of the filter expressions match. + * That is, the expressions must be interpreted as a set of filters that are ANDed together. + *

+ * Spark will call this method to check if the delete is possible without significant effort. + * Otherwise, Spark will try to rewrite the delete operation and produce row-level changes + * if the data source table supports deleting individual records. + * + * @param filters filter expressions, used to select rows to delete when all expressions match + * @return true if the delete operation can be performed + */ + default boolean canDeleteWhere(Filter[] filters) { + return true; + } + /** * Delete data from a data source table that matches filter expressions. *

diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index cfb044b428e41..c4c5835d9d1f5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -335,6 +335,10 @@ class InMemoryTable( } } + override def canDeleteWhere(filters: Array[Filter]): Boolean = { + InMemoryTable.supportsFilters(filters) + } + override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted), filters) @@ -360,6 +364,14 @@ object InMemoryTable { } } + def supportsFilters(filters: Array[Filter]): Boolean = { + filters.flatMap(splitAnd).forall { + case _: EqualTo => true + case _: IsNotNull => true + case _ => false + } + } + private def extractValue( attr: String, partFieldNames: Seq[String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 0c7bc19ad054e..938ba77fede47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -221,6 +221,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat throw new AnalysisException(s"Exec update failed:" + s" cannot translate expression to source filter: $f")) }).toArray + + if (!table.asDeletable.canDeleteWhere(filters)) { + throw new AnalysisException( + s"Cannot delete from table ${table.name} where ${filters.mkString("[", ", ", "]")}") + } + DeleteFromTableExec(table.asDeletable, filters) :: Nil case _ => throw new AnalysisException("DELETE is only supported with v2 tables.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 583bc694dc3be..da1f50ba6127c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1811,6 +1811,20 @@ class DataSourceV2SQLSuite } } + test("DeleteFrom: delete with unsupported predicates") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + val exc = intercept[AnalysisException] { + sql(s"DELETE FROM $t WHERE id > 3 AND p > 3") + } + + assert(spark.table(t).count === 3) + assert(exc.getMessage.contains(s"Cannot delete from table $t")) + } + } + test("DeleteFrom: DELETE is only supported with v2 tables") { // unset this config to use the default v2 session catalog. spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) From 85ea2c9ff42eb51b674a43c06daec8a6bd1475f4 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 2 Dec 2020 11:39:52 +0200 Subject: [PATCH 2/3] Review comments --- .../spark/sql/connector/catalog/SupportsDelete.java | 8 +++++--- .../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java index 542820dde86ef..bee5f5dc6ecb5 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java @@ -36,9 +36,11 @@ public interface SupportsDelete { * Rows should be deleted from the data source iff all of the filter expressions match. * That is, the expressions must be interpreted as a set of filters that are ANDed together. *

- * Spark will call this method to check if the delete is possible without significant effort. - * Otherwise, Spark will try to rewrite the delete operation and produce row-level changes - * if the data source table supports deleting individual records. + * Spark will call this method to check whether {@link #deleteWhere(Filter[])} would reject + * the delete operation because it requires significant effort. If this method returns false, + * Spark will not call {@link #deleteWhere(Filter[])} and will try to rewrite the delete + * operation and produce row-level changes if the data source table supports deleting + * individual records. * * @param filters filter expressions, used to select rows to delete when all expressions match * @return true if the delete operation can be performed diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index da1f50ba6127c..97cf3d931e2fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1814,7 +1814,7 @@ class DataSourceV2SQLSuite test("DeleteFrom: delete with unsupported predicates") { val t = "testcat.ns1.ns2.tbl" withTable(t) { - sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo") sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") val exc = intercept[AnalysisException] { sql(s"DELETE FROM $t WHERE id > 3 AND p > 3") From 8b909f3abeb6ef84e9496b7f67bdfbf96f86940f Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 2 Dec 2020 22:14:48 +0200 Subject: [PATCH 3/3] Update doc --- .../spark/sql/connector/catalog/SupportsDelete.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java index bee5f5dc6ecb5..261e5344be7b9 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java @@ -36,10 +36,10 @@ public interface SupportsDelete { * Rows should be deleted from the data source iff all of the filter expressions match. * That is, the expressions must be interpreted as a set of filters that are ANDed together. *

- * Spark will call this method to check whether {@link #deleteWhere(Filter[])} would reject - * the delete operation because it requires significant effort. If this method returns false, - * Spark will not call {@link #deleteWhere(Filter[])} and will try to rewrite the delete - * operation and produce row-level changes if the data source table supports deleting + * Spark will call this method at planning time to check whether {@link #deleteWhere(Filter[])} + * would reject the delete operation because it requires significant effort. If this method + * returns false, Spark will not call {@link #deleteWhere(Filter[])} and will try to rewrite + * the delete operation and produce row-level changes if the data source table supports deleting * individual records. * * @param filters filter expressions, used to select rows to delete when all expressions match @@ -50,7 +50,8 @@ default boolean canDeleteWhere(Filter[] filters) { } /** - * Delete data from a data source table that matches filter expressions. + * Delete data from a data source table that matches filter expressions. Note that this method + * will be invoked only if {@link #canDeleteWhere(Filter[])} returns true. *

* Rows are deleted from the data source iff all of the filter expressions match. That is, the * expressions must be interpreted as a set of filters that are ANDed together.