From 8816d1ad4e1044b229d6dc5cfabf0a1998648a68 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Tue, 3 Jun 2025 16:45:38 +0800 Subject: [PATCH 1/5] [GLUTEN-9849][VL] Avoid VeloxBloomFilterMightContain being applied to FileSourceScan partition filters --- .../VeloxBloomFilterMightContain.scala | 13 +++++++- .../gluten/execution/GlutenExpression.scala | 33 +++++++++++++++++++ .../execution/ScanTransformerFactory.scala | 8 ++++- .../sql/GlutenInjectRuntimeFilterSuite.scala | 20 +++++++++-- 4 files changed, 69 insertions(+), 5 deletions(-) create mode 100644 gluten-core/src/main/scala/org/apache/gluten/execution/GlutenExpression.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala index 05e1e3eb481f..5c98e459262b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.expression +import org.apache.gluten.execution.GlutenTaskOnlyExpression import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.utils.VeloxBloomFilter @@ -35,7 +36,8 @@ import org.apache.spark.task.TaskResources case class VeloxBloomFilterMightContain( bloomFilterExpression: Expression, valueExpression: Expression) - extends BinaryExpression { + extends BinaryExpression + with GlutenTaskOnlyExpression { private val delegate = SparkShimLoader.getSparkShims.newMightContain(bloomFilterExpression, valueExpression) @@ -89,6 +91,7 @@ case class VeloxBloomFilterMightContain( val valueEval = valueExpression.genCode(ctx) val code = code""" + org.apache.gluten.expression.VeloxBloomFilterMightContain.checkInSparkTask(); ${valueEval.code} boolean ${ev.isNull} = ${valueEval.isNull}; ${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; @@ -98,3 +101,11 @@ case class VeloxBloomFilterMightContain( ev.copy(code = code) } } + +object VeloxBloomFilterMightContain { + def checkInSparkTask(): Unit = { + if (!TaskResources.inSparkTask()) { + throw new UnsupportedOperationException("velox_might_contain is not evaluable on Driver") + } + } +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenExpression.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenExpression.scala new file mode 100644 index 000000000000..7ea8b558ca6c --- /dev/null +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenExpression.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +/** + * GlutenExpression is a marker trait for expressions that are specific to Gluten execution. It can + * be used to identify expressions that should only be evaluated in the context of a Spark task. + */ +trait GlutenExpression { + + def onlyInSparkTask(): Boolean = false + +} + +trait GlutenTaskOnlyExpression extends GlutenExpression { + + override def onlyInSparkTask(): Boolean = true + +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index 2a8cc9138239..bcf209c8ae08 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -25,11 +25,17 @@ object ScanTransformerFactory { def createFileSourceScanTransformer( scanExec: FileSourceScanExec): FileSourceScanExecTransformerBase = { + // Partition filters will be evaluated in driver side, so we can remove + // GlutenTaskOnlyExpressions + val partitionFilters = scanExec.partitionFilters.filter { + case _: GlutenTaskOnlyExpression => false + case _ => true + } FileSourceScanExecTransformer( scanExec.relation, scanExec.output, scanExec.requiredSchema, - scanExec.partitionFilters, + partitionFilters, scanExec.optionalBucketSet, scanExec.optionalNumCoalescedBuckets, scanExec.dataFilters, diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala index 11b6d99828c6..fec30bfe89c4 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala @@ -16,6 +16,20 @@ */ package org.apache.spark.sql -class GlutenInjectRuntimeFilterSuite - extends InjectRuntimeFilterSuite - with GlutenSQLTestsBaseTrait {} +import org.apache.spark.sql.internal.SQLConf + +class GlutenInjectRuntimeFilterSuite extends InjectRuntimeFilterSuite with GlutenSQLTestsBaseTrait { + + test("velox bloom filter applied to partition filter") { + withSQLConf( + SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", + SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000" + ) { + assertRewroteWithBloomFilter( + "select * from bf5part join bf2 on " + + "bf5part.f5 = bf2.c2 where bf2.a2 = 67") + } + } + +} From a3812238c2f1985e670e3f23f1125074a106c6a7 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 5 Jun 2025 12:56:48 +0800 Subject: [PATCH 2/5] test --- .../apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala index fec30bfe89c4..39a6ba3c3b35 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala @@ -20,12 +20,16 @@ import org.apache.spark.sql.internal.SQLConf class GlutenInjectRuntimeFilterSuite extends InjectRuntimeFilterSuite with GlutenSQLTestsBaseTrait { - test("velox bloom filter applied to partition filter") { + test("bloom filter applied to partition filter") { withSQLConf( SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000" ) { + val query = "select * from bf5part join bf2 on bf5part.f5 = bf2.c2 where bf2.a2 = 67" + val df = sql(query) + df.collect() + logWarning(s"GlutenInjectRuntimeFilterSuite Plan:\n${df.queryExecution.executedPlan}\n") assertRewroteWithBloomFilter( "select * from bf5part join bf2 on " + "bf5part.f5 = bf2.c2 where bf2.a2 = 67") From 3dd7f486d79158019f9fabb11ee1c5e4dbe6e64d Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 5 Jun 2025 20:01:55 +0800 Subject: [PATCH 3/5] test --- .../apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala index 39a6ba3c3b35..a19d153d82d2 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala @@ -24,12 +24,15 @@ class GlutenInjectRuntimeFilterSuite extends InjectRuntimeFilterSuite with Glute withSQLConf( SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "false", + "spark.gluten.sql.native.bloomFilter" -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000" ) { val query = "select * from bf5part join bf2 on bf5part.f5 = bf2.c2 where bf2.a2 = 67" val df = sql(query) df.collect() - logWarning(s"GlutenInjectRuntimeFilterSuite Plan:\n${df.queryExecution.executedPlan}\n") + val plan = df.queryExecution.executedPlan.toString() + logWarning(s"GlutenInjectRuntimeFilterSuite Plan:") + plan.split("\n").foreach(logWarning(_)) assertRewroteWithBloomFilter( "select * from bf5part join bf2 on " + "bf5part.f5 = bf2.c2 where bf2.a2 = 67") From a1c72963af5293d4dde592075f37b26ff134f446 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Fri, 6 Jun 2025 10:13:48 +0800 Subject: [PATCH 4/5] fix --- .../gluten/utils/clickhouse/ClickHouseTestSettings.scala | 1 + .../spark/sql/GlutenInjectRuntimeFilterSuite.scala | 9 +-------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 9026fefffd95..0860b5e3e7ea 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -901,6 +901,7 @@ class ClickHouseTestSettings extends BackendTestSettings { enableSuite[GlutenInjectRuntimeFilterSuite] // FIXME: yan .includeCH("Merge runtime bloom filters") + .excludeGlutenTest("GLUTEN-9849: bloom filter applied to partition filter") enableSuite[GlutenInnerJoinSuiteForceShjOff] .excludeCH( "inner join, one match per row using ShuffledHashJoin (build=left) (whole-stage-codegen off)") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala index a19d153d82d2..26e11f84387a 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala @@ -20,19 +20,12 @@ import org.apache.spark.sql.internal.SQLConf class GlutenInjectRuntimeFilterSuite extends InjectRuntimeFilterSuite with GlutenSQLTestsBaseTrait { - test("bloom filter applied to partition filter") { + testGluten("GLUTEN-9849: bloom filter applied to partition filter") { withSQLConf( SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "false", - "spark.gluten.sql.native.bloomFilter" -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000" ) { - val query = "select * from bf5part join bf2 on bf5part.f5 = bf2.c2 where bf2.a2 = 67" - val df = sql(query) - df.collect() - val plan = df.queryExecution.executedPlan.toString() - logWarning(s"GlutenInjectRuntimeFilterSuite Plan:") - plan.split("\n").foreach(logWarning(_)) assertRewroteWithBloomFilter( "select * from bf5part join bf2 on " + "bf5part.f5 = bf2.c2 where bf2.a2 = 67") From a2ace5e9ad1ab6148f26ea0ea3f64816b2b78514 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 26 Jun 2025 16:49:31 +0800 Subject: [PATCH 5/5] add comment --- .../gluten/expression/VeloxBloomFilterMightContain.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala index 5c98e459262b..3821fd001c0b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala @@ -32,6 +32,9 @@ import org.apache.spark.task.TaskResources * Velox's bloom-filter implementation uses different algorithms internally comparing to vanilla * Spark so produces different intermediate aggregate data. Thus we use different filter function / * agg function types for Velox's version to distinguish from vanilla Spark's implementation. + * + * FIXME: Remove GlutenTaskOnlyExpression after the VeloxBloomFilter expr is made compatible with + * spark. See: https://github.com/apache/incubator-gluten/pull/9850#issuecomment-3007448538 */ case class VeloxBloomFilterMightContain( bloomFilterExpression: Expression,