-
Notifications
You must be signed in to change notification settings - Fork 0
[WIP] String functions v1: contains, startsWith, endsWith. #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: collation_v2
Are you sure you want to change the base?
Conversation
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
Show resolved
Hide resolved
| found = true | ||
| } | ||
| } | ||
| found |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: for all collations that do have a "collator" (collationID >= 2), manual implementation is required. It appears to me that all manual implementations for built-in string functions will essentially boil down to string manipulation and using "collator.compare" to produce the intended function.
While this implementation resembles the corresponding "contains" library implementation in UTF8String.java, this one could probably be more efficient - open to suggestions!
Also note: while it's thoroughly tested already, here's a quick fiddle for the Scala code: https://scastie.scala-lang.org/sC4SOgI8T9a9lp0y1h18fg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: this code is now Java-only (implemented directly in CollationFactory::Collation) in the new PR, but here's an equivalent Java fiddle for experimental purposes only: https://onecompiler.com/java/42578zbpf
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
Show resolved
Hide resolved
|
|
||
| override def compare(l: UTF8String, r: UTF8String): Boolean = { | ||
| val lCollationID = left.dataType.asInstanceOf[StringType].collationId | ||
| val rCollationID = right.dataType.asInstanceOf[StringType].collationId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that you should just fail if collations are not equal at this point. Maybe you can create some common trait that will assure that collations are equal (e.g. in StringPredicate).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would currently disable queries like:
contains("abc", collate("C", "UNICODE_CI"))// 0, 3contains(collate("abc", "UNICODE_CI"), "C")// 3, 0
And only leave enabled queries like:
contains(collate("abc", "UNICODE_CI"), collate("C", "UNICODE_CI"))// 3, 3contains("abc", "c")// 0, 0
Shouldn't we essentially allow different collationIDs, as long as they are "compatible"? (whatever that might mean in the context of the current collatorFactory implementation) - or otherwise force them to be equal at one point before they reach this part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will take care of literal casting as a separate track. @stefankandic as FYI.
But I don't think that every operator needs to do such checks.
| GenerateUnsafeProjection.generate(StartsWith(Literal("\"quote"), Literal("\"quote")) :: Nil) | ||
| } | ||
|
|
||
| test("Support contains string expression with Collation") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kudos for adding so many tests!
| // Test 'contains' with different collations | ||
|
|
||
| // UCS_BASIC (default) | ||
| checkEvaluation(Contains(Literal(""), Literal("")), true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would rather test these using the sql method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StringExpressions are usually tested like this in StringExpressionsSuite.scala. They seem to be very handy for initial debugging and generally upgrading collation support in string expressions
On the other hand, I added a bunch of sql tests in CollationSuite.scala too! While I think it would be good to keep both for now, perhaps we can toss one of these suites away in the future?
…n properly
### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly
### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL
before this PR:
```
from pyspark.sql import functions as sf
spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")
df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")
join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)
join2.schema
```
fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```
That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect
```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
'[apache#12]Join LeftOuter, '`==`('index, 'id) '[apache#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[apache#11]Project ['index, 'value_2] : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
! +- '[#10]Join Inner, '`==`('id, 'index) +- '[apache#11]Project ['index, 'value_2]
! :- '[#7]UnresolvedRelation [test_table_1], [], false +- '[#10]Join Inner, '`==`('id, 'index)
! +- '[#8]UnresolvedRelation [test_table_2], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
! : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
! +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
! +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false
Can not resolve 'id with plan 7
```
`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
+- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```
### Does this PR introduce _any_ user-facing change?
yes, bug fix
### How was this patch tested?
added ut
### Was this patch authored or co-authored using generative AI tooling?
ci
Closes apache#45214 from zhengruifeng/connect_fix_read_join.
Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This is a trivial change to replace the loop index from `int` to `long`. Surprisingly, microbenchmark shows more than double performance uplift.
Analysis
--------
The hot loop of `arrayEquals` method is simplifed as below. Loop index `i` is defined as `int`, it's compared with `length`, which is a `long`, to determine if the loop should end.
```
public static boolean arrayEquals(
Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
......
int i = 0;
while (i <= length - 8) {
if (Platform.getLong(leftBase, leftOffset + i) !=
Platform.getLong(rightBase, rightOffset + i)) {
return false;
}
i += 8;
}
......
}
```
Strictly speaking, there's a code bug here. If `length` is greater than 2^31 + 8, this loop will never end because `i` as a 32 bit integer is at most 2^31 - 1. But compiler must consider this behaviour as intentional and generate code strictly match the logic. It prevents compiler from generating optimal code.
Defining loop index `i` as `long` corrects this issue. Besides more accurate code logic, JIT is able to optimize this code much more aggressively. From microbenchmark, this trivial change improves performance significantly on both Arm and x86 platforms.
Benchmark
---------
Source code:
https://gist.github.com/cyb70289/258e261f388e22f47e4d961431786d1a
Result on Arm Neoverse N2:
```
Benchmark Mode Cnt Score Error Units
ArrayEqualsBenchmark.arrayEqualsInt avgt 10 674.313 ± 0.213 ns/op
ArrayEqualsBenchmark.arrayEqualsLong avgt 10 313.563 ± 2.338 ns/op
```
Result on Intel Cascake Lake:
```
Benchmark Mode Cnt Score Error Units
ArrayEqualsBenchmark.arrayEqualsInt avgt 10 1130.695 ± 0.168 ns/op
ArrayEqualsBenchmark.arrayEqualsLong avgt 10 461.979 ± 0.097 ns/op
```
Deep dive
---------
Dive deep to the machine code level, we can see why the big gap. Listed below are arm64 assembly generated by Openjdk-17 C2 compiler.
For `int i`, the machine code is similar to source code, no deep optimization. Safepoint polling is expensive in this short loop.
```
// jit c2 machine code snippet
0x0000ffff81ba8904: mov w15, wzr // int i = 0
0x0000ffff81ba8908: nop
0x0000ffff81ba890c: nop
loop:
0x0000ffff81ba8910: ldr x10, [x13, w15, sxtw] // Platform.getLong(leftBase, leftOffset + i)
0x0000ffff81ba8914: ldr x14, [x12, w15, sxtw] // Platform.getLong(rightBase, rightOffset + i)
0x0000ffff81ba8918: cmp x10, x14
0x0000ffff81ba891c: b.ne 0x0000ffff81ba899c // return false if not equal
0x0000ffff81ba8920: ldr x14, [x28, apache#848] // x14 -> safepoint
0x0000ffff81ba8924: add w15, w15, #0x8 // i += 8
0x0000ffff81ba8928: ldr wzr, [x14] // safepoint polling
0x0000ffff81ba892c: sxtw x10, w15 // extend i to long
0x0000ffff81ba8930: cmp x10, x11
0x0000ffff81ba8934: b.le 0x0000ffff81ba8910 // if (i <= length - 8) goto loop
```
For `long i`, JIT is able to do much more aggressive optimization. E.g, below code snippet unrolls the loop by four.
```
// jit c2 machine code snippet
unrolled_loop:
0x0000ffff91de6fe0: sxtw x10, w7
0x0000ffff91de6fe4: add x23, x22, x10
0x0000ffff91de6fe8: add x24, x21, x10
0x0000ffff91de6fec: ldr x13, [x23] // unroll-1
0x0000ffff91de6ff0: ldr x14, [x24]
0x0000ffff91de6ff4: cmp x13, x14
0x0000ffff91de6ff8: b.ne 0x0000ffff91de70a8
0x0000ffff91de6ffc: ldr x13, [x23, #8] // unroll-2
0x0000ffff91de7000: ldr x14, [x24, #8]
0x0000ffff91de7004: cmp x13, x14
0x0000ffff91de7008: b.ne 0x0000ffff91de70b4
0x0000ffff91de700c: ldr x13, [x23, apache#16] // unroll-3
0x0000ffff91de7010: ldr x14, [x24, apache#16]
0x0000ffff91de7014: cmp x13, x14
0x0000ffff91de7018: b.ne 0x0000ffff91de70a4
0x0000ffff91de701c: ldr x13, [x23, apache#24] // unroll-4
0x0000ffff91de7020: ldr x14, [x24, apache#24]
0x0000ffff91de7024: cmp x13, x14
0x0000ffff91de7028: b.ne 0x0000ffff91de70b0
0x0000ffff91de702c: add w7, w7, #0x20
0x0000ffff91de7030: cmp w7, w11
0x0000ffff91de7034: b.lt 0x0000ffff91de6fe0
```
### What changes were proposed in this pull request?
A trivial change to replace loop index `i` of method `arrayEquals` from `int` to `long`.
### Why are the changes needed?
To improve performance and fix a possible bug.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes apache#49568 from cyb70289/arrayEquals.
Authored-by: Yibo Cai <cyb70289@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
What changes were proposed in this pull request?
Refactor built-in string functions to support collation for: contains, startsWith, endsWith.
Why are the changes needed?
Add collation support for built-in string functions in Spark.
Does this PR introduce any user-facing change?
Yes, users should now be able to use COLLATE within arguments for built-in string functions: CONTAINS, STARTSWITH, ENDSWITH in Spark SQL queries.
How was this patch tested?
Unit tests for:
Was this patch authored or co-authored using generative AI tooling?
Yes.