diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala index 05e1e3eb481f..3821fd001c0b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.expression +import org.apache.gluten.execution.GlutenTaskOnlyExpression import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.utils.VeloxBloomFilter @@ -31,11 +32,15 @@ import org.apache.spark.task.TaskResources * Velox's bloom-filter implementation uses different algorithms internally comparing to vanilla * Spark so produces different intermediate aggregate data. Thus we use different filter function / * agg function types for Velox's version to distinguish from vanilla Spark's implementation. + * + * FIXME: Remove GlutenTaskOnlyExpression after the VeloxBloomFilter expr is made compatible with + * spark. See: https://github.com/apache/incubator-gluten/pull/9850#issuecomment-3007448538 */ case class VeloxBloomFilterMightContain( bloomFilterExpression: Expression, valueExpression: Expression) - extends BinaryExpression { + extends BinaryExpression + with GlutenTaskOnlyExpression { private val delegate = SparkShimLoader.getSparkShims.newMightContain(bloomFilterExpression, valueExpression) @@ -89,6 +94,7 @@ case class VeloxBloomFilterMightContain( val valueEval = valueExpression.genCode(ctx) val code = code""" + org.apache.gluten.expression.VeloxBloomFilterMightContain.checkInSparkTask(); ${valueEval.code} boolean ${ev.isNull} = ${valueEval.isNull}; ${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; @@ -98,3 +104,11 @@ case class VeloxBloomFilterMightContain( ev.copy(code = code) } } + +object VeloxBloomFilterMightContain { + def checkInSparkTask(): Unit = { + if (!TaskResources.inSparkTask()) { + throw new UnsupportedOperationException("velox_might_contain is not evaluable on Driver") + } + } +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenExpression.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenExpression.scala new file mode 100644 index 000000000000..7ea8b558ca6c --- /dev/null +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenExpression.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +/** + * GlutenExpression is a marker trait for expressions that are specific to Gluten execution. It can + * be used to identify expressions that should only be evaluated in the context of a Spark task. + */ +trait GlutenExpression { + + def onlyInSparkTask(): Boolean = false + +} + +trait GlutenTaskOnlyExpression extends GlutenExpression { + + override def onlyInSparkTask(): Boolean = true + +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index 2a8cc9138239..bcf209c8ae08 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -25,11 +25,17 @@ object ScanTransformerFactory { def createFileSourceScanTransformer( scanExec: FileSourceScanExec): FileSourceScanExecTransformerBase = { + // Partition filters will be evaluated in driver side, so we can remove + // GlutenTaskOnlyExpressions + val partitionFilters = scanExec.partitionFilters.filter { + case _: GlutenTaskOnlyExpression => false + case _ => true + } FileSourceScanExecTransformer( scanExec.relation, scanExec.output, scanExec.requiredSchema, - scanExec.partitionFilters, + partitionFilters, scanExec.optionalBucketSet, scanExec.optionalNumCoalescedBuckets, scanExec.dataFilters, diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 9026fefffd95..0860b5e3e7ea 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -901,6 +901,7 @@ class ClickHouseTestSettings extends BackendTestSettings { enableSuite[GlutenInjectRuntimeFilterSuite] // FIXME: yan .includeCH("Merge runtime bloom filters") + .excludeGlutenTest("GLUTEN-9849: bloom filter applied to partition filter") enableSuite[GlutenInnerJoinSuiteForceShjOff] .excludeCH( "inner join, one match per row using ShuffledHashJoin (build=left) (whole-stage-codegen off)") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala index 11b6d99828c6..26e11f84387a 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenInjectRuntimeFilterSuite.scala @@ -16,6 +16,20 @@ */ package org.apache.spark.sql -class GlutenInjectRuntimeFilterSuite - extends InjectRuntimeFilterSuite - with GlutenSQLTestsBaseTrait {} +import org.apache.spark.sql.internal.SQLConf + +class GlutenInjectRuntimeFilterSuite extends InjectRuntimeFilterSuite with GlutenSQLTestsBaseTrait { + + testGluten("GLUTEN-9849: bloom filter applied to partition filter") { + withSQLConf( + SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000", + SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000" + ) { + assertRewroteWithBloomFilter( + "select * from bf5part join bf2 on " + + "bf5part.f5 = bf2.c2 where bf2.a2 = 67") + } + } + +}