Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gluten.expression

import org.apache.gluten.execution.GlutenTaskOnlyExpression
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.VeloxBloomFilter

Expand All @@ -31,11 +32,15 @@ import org.apache.spark.task.TaskResources
* Velox's bloom-filter implementation uses different algorithms internally comparing to vanilla
* Spark so produces different intermediate aggregate data. Thus we use different filter function /
* agg function types for Velox's version to distinguish from vanilla Spark's implementation.
*
* FIXME: Remove GlutenTaskOnlyExpression after the VeloxBloomFilter expr is made compatible with
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer @jinchengchenghh I added a comment to record this improvement, how about we merge this PR first?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem from my end. cc @jinchengchenghh

And can we log an issue highlighting the performance gap because of the unavailability of scan + might_contain?

Also I guess vanilla Spark's scan + Velox might_contain will cause the issue as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I guess vanilla Spark's scan + Velox might_contain will cause the issue as well?

In what scenarios does a vanilla spark scan with native expressions occur? Do we have an existing test case?

Copy link
Member

@zhztheplayer zhztheplayer Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant when velox_might_contain is included in a vanilla Spark scan node, which for whatever reason was fallen back by Gluten. I was just guessing whether the same issue will happen in the case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant when velox_might_contain is included in a vanilla Spark scan node, which for whatever reason was fallen back by Gluten. I was just guessing whether the same issue will happen in the case.

Oh, I see.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And can we log an issue highlighting the performance gap because of the unavailability of scan + might_contain?

Filed an issue #10071

* spark. See: https://github.com/apache/incubator-gluten/pull/9850#issuecomment-3007448538
*/
case class VeloxBloomFilterMightContain(
bloomFilterExpression: Expression,
valueExpression: Expression)
extends BinaryExpression {
extends BinaryExpression
with GlutenTaskOnlyExpression {

private val delegate =
SparkShimLoader.getSparkShims.newMightContain(bloomFilterExpression, valueExpression)
Expand Down Expand Up @@ -89,6 +94,7 @@ case class VeloxBloomFilterMightContain(
val valueEval = valueExpression.genCode(ctx)
val code =
code"""
org.apache.gluten.expression.VeloxBloomFilterMightContain.checkInSparkTask();
${valueEval.code}
boolean ${ev.isNull} = ${valueEval.isNull};
${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
Expand All @@ -98,3 +104,11 @@ case class VeloxBloomFilterMightContain(
ev.copy(code = code)
}
}

object VeloxBloomFilterMightContain {
def checkInSparkTask(): Unit = {
if (!TaskResources.inSparkTask()) {
throw new UnsupportedOperationException("velox_might_contain is not evaluable on Driver")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

/**
* GlutenExpression is a marker trait for expressions that are specific to Gluten execution. It can
* be used to identify expressions that should only be evaluated in the context of a Spark task.
*/
trait GlutenExpression {

def onlyInSparkTask(): Boolean = false

}

trait GlutenTaskOnlyExpression extends GlutenExpression {

override def onlyInSparkTask(): Boolean = true

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@ object ScanTransformerFactory {

def createFileSourceScanTransformer(
scanExec: FileSourceScanExec): FileSourceScanExecTransformerBase = {
// Partition filters will be evaluated in driver side, so we can remove
// GlutenTaskOnlyExpressions
val partitionFilters = scanExec.partitionFilters.filter {
case _: GlutenTaskOnlyExpression => false
case _ => true
}
FileSourceScanExecTransformer(
scanExec.relation,
scanExec.output,
scanExec.requiredSchema,
scanExec.partitionFilters,
partitionFilters,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to consider this issue also for the BatchScanTransformer? Thanks.

Copy link
Member Author

@wForget wForget Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to consider this issue also for the BatchScanTransformer? Thanks.

I will try to add a unit test to cover this case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to consider this issue also for the BatchScanTransformer? Thanks.

There are no partition filters in BatchScanExecTransformer, and runtime filters will also be converted to source v2 predicates, so there is no similar issue for ``BatchScanExecTransformer`.

scanExec.optionalBucketSet,
scanExec.optionalNumCoalescedBuckets,
scanExec.dataFilters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
enableSuite[GlutenInjectRuntimeFilterSuite]
// FIXME: yan
.includeCH("Merge runtime bloom filters")
.excludeGlutenTest("GLUTEN-9849: bloom filter applied to partition filter")
enableSuite[GlutenInnerJoinSuiteForceShjOff]
.excludeCH(
"inner join, one match per row using ShuffledHashJoin (build=left) (whole-stage-codegen off)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@
*/
package org.apache.spark.sql

class GlutenInjectRuntimeFilterSuite
extends InjectRuntimeFilterSuite
with GlutenSQLTestsBaseTrait {}
import org.apache.spark.sql.internal.SQLConf

class GlutenInjectRuntimeFilterSuite extends InjectRuntimeFilterSuite with GlutenSQLTestsBaseTrait {

testGluten("GLUTEN-9849: bloom filter applied to partition filter") {
withSQLConf(
SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000"
) {
assertRewroteWithBloomFilter(
"select * from bf5part join bf2 on " +
"bf5part.f5 = bf2.c2 where bf2.a2 = 67")
}
}

}