@@ -20,6 +20,8 @@ package org.apache.spark.sql.sources
2020import java .io .File
2121import java .net .URI
2222
23+ import scala .util .Random
24+
2325import org .apache .spark .sql ._
2426import org .apache .spark .sql .catalyst .catalog .BucketSpec
2527import org .apache .spark .sql .catalyst .expressions
@@ -47,11 +49,13 @@ class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedS
4749abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
4850 import testImplicits ._
4951
50- private lazy val df = (0 until 50 ).map(i => (i % 5 , i % 13 , i.toString)).toDF(" i" , " j" , " k" )
52+ private val maxI = 5
53+ private val maxJ = 13
54+ private lazy val df = (0 until 50 ).map(i => (i % maxI, i % maxJ, i.toString)).toDF(" i" , " j" , " k" )
5155 private lazy val nullDF = (for {
5256 i <- 0 to 50
5357 s <- Seq (null , " a" , " b" , " c" , " d" , " e" , " f" , null , " g" )
54- } yield (i % 5 , s, i % 13 )).toDF(" i" , " j" , " k" )
58+ } yield (i % maxI , s, i % maxJ )).toDF(" i" , " j" , " k" )
5559
5660 // number of buckets that doesn't yield empty buckets when bucketing on column j on df/nullDF
5761 // empty buckets before filtering might hide bugs in pruning logic
@@ -66,23 +70,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
6670 .bucketBy(8 , " j" , " k" )
6771 .saveAsTable(" bucketed_table" )
6872
69- for (i <- 0 until 3 ) {
70- val table = spark.table(" bucketed_table" ).filter($" i" === i)
71- val query = table.queryExecution
72- val output = query.analyzed.output
73- val rdd = query.toRdd
74-
75- assert(rdd.partitions.length == 8 )
76-
77- val attrs = table.select(" j" , " k" ).queryExecution.analyzed.output
78- val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => {
79- val getBucketId = UnsafeProjection .create(
80- HashPartitioning (attrs, 8 ).partitionIdExpression :: Nil ,
81- output)
82- rows.map(row => getBucketId(row).getInt(0 ) -> index)
83- })
84- checkBucketId.collect().foreach(r => assert(r._1 == r._2))
85- }
73+ val bucketValue = Random .nextInt(maxI)
74+ val table = spark.table(" bucketed_table" ).filter($" i" === bucketValue)
75+ val query = table.queryExecution
76+ val output = query.analyzed.output
77+ val rdd = query.toRdd
78+
79+ assert(rdd.partitions.length == 8 )
80+
81+ val attrs = table.select(" j" , " k" ).queryExecution.analyzed.output
82+ val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => {
83+ val getBucketId = UnsafeProjection .create(
84+ HashPartitioning (attrs, 8 ).partitionIdExpression :: Nil ,
85+ output)
86+ rows.map(row => getBucketId(row).getInt(0 ) -> index)
87+ })
88+ checkBucketId.collect().foreach(r => assert(r._1 == r._2))
8689 }
8790 }
8891
@@ -145,36 +148,36 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
145148 .bucketBy(numBuckets, " j" )
146149 .saveAsTable(" bucketed_table" )
147150
148- for (j <- 0 until 2 ) {
149- // Case 1: EqualTo
150- checkPrunedAnswers(
151- bucketSpec,
152- bucketValues = j :: Nil ,
153- filterCondition = $" j" === j ,
154- df)
155-
156- // Case 2: EqualNullSafe
157- checkPrunedAnswers(
158- bucketSpec,
159- bucketValues = j :: Nil ,
160- filterCondition = $" j" <=> j ,
161- df)
162-
163- // Case 3: In
164- checkPrunedAnswers(
165- bucketSpec,
166- bucketValues = Seq (j, j + 1 , j + 2 , j + 3 ),
167- filterCondition = $" j" .isin(j, j + 1 , j + 2 , j + 3 ),
168- df)
169-
170- // Case 4: InSet
171- val inSetExpr = expressions.InSet ($" j" .expr, Set (j, j + 1 , j + 2 , j + 3 ).map(lit(_).expr))
172- checkPrunedAnswers(
173- bucketSpec,
174- bucketValues = Seq (j, j + 1 , j + 2 , j + 3 ) ,
175- filterCondition = Column (inSetExpr ),
176- df)
177- }
151+ val bucketValue = Random .nextInt(maxJ)
152+ // Case 1: EqualTo
153+ checkPrunedAnswers(
154+ bucketSpec,
155+ bucketValues = bucketValue :: Nil ,
156+ filterCondition = $" j" === bucketValue ,
157+ df)
158+
159+ // Case 2: EqualNullSafe
160+ checkPrunedAnswers(
161+ bucketSpec,
162+ bucketValues = bucketValue :: Nil ,
163+ filterCondition = $" j" <=> bucketValue ,
164+ df)
165+
166+ // Case 3: In
167+ checkPrunedAnswers(
168+ bucketSpec,
169+ bucketValues = Seq (bucketValue, bucketValue + 1 , bucketValue + 2 , bucketValue + 3 ),
170+ filterCondition = $" j" .isin(bucketValue, bucketValue + 1 , bucketValue + 2 , bucketValue + 3 ),
171+ df)
172+
173+ // Case 4: InSet
174+ val inSetExpr = expressions.InSet ($" j" .expr,
175+ Set (bucketValue, bucketValue + 1 , bucketValue + 2 , bucketValue + 3 ).map(lit(_).expr))
176+ checkPrunedAnswers(
177+ bucketSpec ,
178+ bucketValues = Seq (bucketValue, bucketValue + 1 , bucketValue + 2 , bucketValue + 3 ),
179+ filterCondition = Column (inSetExpr),
180+ df)
178181 }
179182 }
180183
@@ -188,13 +191,12 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
188191 .bucketBy(numBuckets, " j" )
189192 .saveAsTable(" bucketed_table" )
190193
191- for (j <- 0 until 2 ) {
192- checkPrunedAnswers(
193- bucketSpec,
194- bucketValues = j :: Nil ,
195- filterCondition = $" j" === j,
196- df)
197- }
194+ val bucketValue = Random .nextInt(maxJ)
195+ checkPrunedAnswers(
196+ bucketSpec,
197+ bucketValues = bucketValue :: Nil ,
198+ filterCondition = $" j" === bucketValue,
199+ df)
198200 }
199201 }
200202
@@ -236,40 +238,39 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
236238 .bucketBy(numBuckets, " j" )
237239 .saveAsTable(" bucketed_table" )
238240
239- for (j <- 0 until 2 ) {
240- checkPrunedAnswers(
241- bucketSpec,
242- bucketValues = j :: Nil ,
243- filterCondition = $" j" === j && $" k" > $" j" ,
244- df)
245-
246- checkPrunedAnswers(
247- bucketSpec,
248- bucketValues = j :: Nil ,
249- filterCondition = $" j" === j && $" i" > j % 5 ,
250- df)
251-
252- // check multiple bucket values OR condition
253- checkPrunedAnswers(
254- bucketSpec,
255- bucketValues = Seq (j, j + 1 ),
256- filterCondition = $" j" === j || $" j" === (j + 1 ),
257- df)
258-
259- // check bucket value and none bucket value OR condition
260- checkPrunedAnswers(
261- bucketSpec,
262- bucketValues = Nil ,
263- filterCondition = $" j" === j || $" i" === 0 ,
264- df)
265-
266- // check AND condition in complex expression
267- checkPrunedAnswers(
268- bucketSpec,
269- bucketValues = Seq (j),
270- filterCondition = ($" i" === 0 || $" k" > $" j" ) && $" j" === j,
271- df)
272- }
241+ val bucketValue = Random .nextInt(maxJ)
242+ checkPrunedAnswers(
243+ bucketSpec,
244+ bucketValues = bucketValue :: Nil ,
245+ filterCondition = $" j" === bucketValue && $" k" > $" j" ,
246+ df)
247+
248+ checkPrunedAnswers(
249+ bucketSpec,
250+ bucketValues = bucketValue :: Nil ,
251+ filterCondition = $" j" === bucketValue && $" i" > bucketValue % 5 ,
252+ df)
253+
254+ // check multiple bucket values OR condition
255+ checkPrunedAnswers(
256+ bucketSpec,
257+ bucketValues = Seq (bucketValue, bucketValue + 1 ),
258+ filterCondition = $" j" === bucketValue || $" j" === (bucketValue + 1 ),
259+ df)
260+
261+ // check bucket value and none bucket value OR condition
262+ checkPrunedAnswers(
263+ bucketSpec,
264+ bucketValues = Nil ,
265+ filterCondition = $" j" === bucketValue || $" i" === 0 ,
266+ df)
267+
268+ // check AND condition in complex expression
269+ checkPrunedAnswers(
270+ bucketSpec,
271+ bucketValues = Seq (bucketValue),
272+ filterCondition = ($" i" === 0 || $" k" > $" j" ) && $" j" === bucketValue,
273+ df)
273274 }
274275 }
275276
0 commit comments