Skip to content

Commit 2c6f4d6

Browse files
dilipbiswalgatorsmile
authored andcommitted
[SPARK-25610][SQL][TEST] Improve execution time of DatasetCacheSuite: cache UDF result correctly
## What changes were proposed in this pull request? In this test case, we are verifying that the result of an UDF is cached when the underlying data frame is cached and that the udf is not evaluated again when the cached data frame is used. To reduce the runtime we do : 1) Use a single partition dataframe, so the total execution time of UDF is more deterministic. 2) Cut down the size of the dataframe from 10 to 2. 3) Reduce the sleep time in the UDF from 5secs to 2secs. 4) Reduce the failafter condition from 3 to 2. With the above change, it takes about 4 secs to cache the first dataframe. And subsequent check takes a few hundred milliseconds. The new runtime for 5 consecutive runs of this test is as follows : ``` [info] - cache UDF result correctly (4 seconds, 906 milliseconds) [info] - cache UDF result correctly (4 seconds, 281 milliseconds) [info] - cache UDF result correctly (4 seconds, 288 milliseconds) [info] - cache UDF result correctly (4 seconds, 355 milliseconds) [info] - cache UDF result correctly (4 seconds, 280 milliseconds) ``` ## How was this patch tested? This is s test fix. Closes #22638 from dilipbiswal/SPARK-25610. Authored-by: Dilip Biswal <dbiswal@us.ibm.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
1 parent bbd038d commit 2c6f4d6

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,16 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
127127
}
128128

129129
test("cache UDF result correctly") {
130-
val expensiveUDF = udf({x: Int => Thread.sleep(5000); x})
131-
val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
130+
val expensiveUDF = udf({x: Int => Thread.sleep(2000); x})
131+
val df = spark.range(0, 2).toDF("a").repartition(1).withColumn("b", expensiveUDF($"a"))
132132
val df2 = df.agg(sum(df("b")))
133133

134134
df.cache()
135135
df.count()
136136
assertCached(df2)
137137

138138
// udf has been evaluated during caching, and thus should not be re-evaluated here
139-
failAfter(3 seconds) {
139+
failAfter(2 seconds) {
140140
df2.collect()
141141
}
142142

0 commit comments

Comments
 (0)