Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for arrays in hashaggregate [databricks] #7465

Merged
merged 18 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ Accelerator supports are described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Struct as child;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><em>PS<br/>not allowed for grouping expressions if containing Array or Map as child;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, UDT</em></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -748,7 +748,7 @@ Accelerator supports are described below.
<td>S</td>
<td>S</td>
<td><b>NS</b></td>
<td><em>PS<br/>Round-robin partitioning is not supported if spark.sql.execution.sortBeforeRepartition is true;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>Round-robin partitioning is not supported if spark.sql.execution.sortBeforeRepartition is true;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><em>PS<br/>Round-robin partitioning is not supported for nested structs if spark.sql.execution.sortBeforeRepartition is true;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types CALENDAR, UDT</em></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -7969,45 +7969,45 @@ are limited.
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>input</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td>S</td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td>S</td>
</tr>
<tr>
<td rowSpan="2">KnownNotNull</td>
Expand Down Expand Up @@ -18967,9 +18967,9 @@ as `a` don't show up in the table. They are controlled by the rules for
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
24 changes: 22 additions & 2 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -116,6 +116,19 @@
('b', FloatGen(nullable=(True, 10.0), special_cases=[(float('nan'), 10.0)])),
('c', LongGen())]

# grouping single-level lists
_grpkey_list_with_non_nested_children = [[('a', RepeatSeqGen(ArrayGen(data_gen), length=3)),
revans2 marked this conversation as resolved.
Show resolved Hide resolved
('b', IntegerGen())] for data_gen in all_basic_gens + decimal_gens]

#grouping mutliple-level structs with arrays
_grpkey_nested_structs_with_array_basic_child = [
('a', RepeatSeqGen(StructGen([
['aa', IntegerGen()],
['ab', ArrayGen(IntegerGen())]]),
length=20)),
('b', IntegerGen()),
('c', NullGen())]

_nan_zero_float_special_cases = [
(float('nan'), 5.0),
(NEG_FLOAT_NAN_MIN_VALUE, 5.0),
Expand Down Expand Up @@ -318,7 +331,7 @@ def test_hash_reduction_decimal_overflow_sum(precision):
# some optimizations are conspiring against us.
conf = {'spark.rapids.sql.batchSizeBytes': '128m'})

@pytest.mark.parametrize('data_gen', [_longs_with_nulls], ids=idfn)
@pytest.mark.parametrize('data_gen', [_grpkey_nested_structs_with_array_basic_child, _longs_with_nulls] + _grpkey_list_with_non_nested_children, ids=idfn)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
def test_hash_grpby_sum_count_action(data_gen):
assert_gpu_and_cpu_row_counts_equal(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a').agg(f.sum('b'))
Expand Down Expand Up @@ -1707,6 +1720,13 @@ def do_it(spark):
assert_gpu_and_cpu_are_equal_collect(do_it,
conf={'spark.sql.ansi.enabled': 'true'})

@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [ArrayGen(sub_gen, nullable=False) for sub_gen in all_basic_gens + decimal_gens])
def test_hash_aggregate_grpby_array_agg_on_first(data_gen):
gen = RepeatSeqGen(data_gen, length=10)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, gen, IntegerGen())
.groupBy('a').agg(f.first('b')))

# Tests for standard deviation and variance aggregations.
@ignore_order(local=True)
Expand Down
15 changes: 14 additions & 1 deletion integration_tests/src/main/python/repart_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,23 @@ def test_round_robin_sort_fallback(data_gen):
lambda spark : gen_df(spark, data_gen).withColumn('extra', lit(1)).repartition(13),
'ShuffleExchangeExec')

@allow_non_gpu("ProjectExec", "ShuffleExchangeExec")
@ignore_order(local=True) # To avoid extra data shuffle by 'sort on Spark' for this repartition test.
@pytest.mark.parametrize('num_parts', [2, 10, 17, 19, 32], ids=idfn)
@pytest.mark.parametrize('gen', [([('ag', ArrayGen(StructGen([('b1', long_gen)])))], ['ag'])], ids=idfn)
def test_hash_repartition_exact_fallback(gen, num_parts):
data_gen = gen[0]
part_on = gen[1]
assert_gpu_fallback_collect(
lambda spark : gen_df(spark, data_gen, length=1024) \
.repartition(num_parts, *part_on) \
.withColumn('id', f.spark_partition_id()) \
.selectExpr('*'), "ShuffleExchangeExec")

@ignore_order(local=True) # To avoid extra data shuffle by 'sort on Spark' for this repartition test.
@pytest.mark.parametrize('num_parts', [1, 2, 10, 17, 19, 32], ids=idfn)
@pytest.mark.parametrize('gen', [
([('a', boolean_gen)], ['a']),
([('a', boolean_gen)], ['a']),
([('a', byte_gen)], ['a']),
([('a', short_gen)], ['a']),
([('a', int_gen)], ['a']),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1509,9 +1509,7 @@ object GpuOverrides extends Logging {
}),
expr[KnownFloatingPointNormalized](
"Tag to prevent redundant normalization",
ExprChecks.unaryProjectInputMatchesOutput(
TypeSig.DOUBLE + TypeSig.FLOAT,
TypeSig.DOUBLE + TypeSig.FLOAT),
ExprChecks.unaryProjectInputMatchesOutput(TypeSig.all, TypeSig.all),
(a, conf, p, r) => new UnaryExprMeta[KnownFloatingPointNormalized](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression =
GpuKnownFloatingPointNormalized(child)
Expand Down Expand Up @@ -3562,11 +3560,26 @@ object GpuOverrides extends Logging {
// This needs to match what murmur3 supports.
PartChecks(RepeatingParamCheck("hash_key",
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 +
TypeSig.STRUCT).nested(), TypeSig.all)),
TypeSig.STRUCT + TypeSig.ARRAY).nested(),
revans2 marked this conversation as resolved.
Show resolved Hide resolved
TypeSig.all)
),
(hp, conf, p, r) => new PartMeta[HashPartitioning](hp, conf, p, r) {
override val childExprs: Seq[BaseExprMeta[_]] =
hp.expressions.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override def tagPartForGpu(): Unit = {
val arrayWithStructsHashing = hp.expressions.exists(e =>
TrampolineUtil.dataTypeExistsRecursively(e.dataType,
dt => dt match {
case ArrayType(_: StructType, _) => true
case _ => false
})
)
if (arrayWithStructsHashing) {
willNotWorkOnGpu("hashing arrays with structs is not supported")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a follow on issue to fix this? Have we tested that this does not work?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sameerz is hashing function for Array[Struct] supported in cudf or is there an issue tracking that?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cudf, lists of structs and structs of lists are not yet supported (tracking issue rapidsai/cudf#11222).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also spark-rapid tracking issue: #5109

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ttnghia, the cudf issue you have tagged rapidsai/cudf#11222 seems to be for sorting a list of structs, is that the same for hashing? I tested passing a list of structs as groupBy key and cudf didn't like it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand what you are saying correctly, I am aware that we don't have any control in the plugin to pick which aggregation to use, cudf will pick Hash or Sort.
But the line this comment is referencing is explicitly talking about HashPartitioning so I think this very explicitly only concerns that. To prove this, I uncommented this check and tried to do a groupBy on a List[Struct] and got an error from cudf saying that murmur hash is not implemented for List[Struct]. This is why I was confused why issue rapidsai/cudf#11222, which is completed, is tagged here when it deals with sorting a list and clearly doesn't fix the problem with groupBy on List[Struct]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we have to answer the question about what does HashPartitioning do for groupby?

Copy link
Collaborator Author

@razajafri razajafri Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If groupBy uses HashAggregate then it will use HashPartitioning to create buckets with buffers pointing to values. So in this case, it will try to calculate the hash for List[Struct] which isn't supported atm by cudf.

scala> df.groupBy("_1","_2").count().explain
== Physical Plan ==
*(2) HashAggregate(keys=[_1#3, _2#4], functions=[count(1)])
+- Exchange hashpartitioning(_1#3, _2#4, 200), ENSURE_REQUIREMENTS, [id=#55]
   +- *(1) HashAggregate(keys=[_1#3, _2#4], functions=[partial_count(1)])
      +- *(1) LocalTableScan [_1#3, _2#4]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've filed an issue for it: #8676

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Is there a corresponding cudf issue?

}
}

override def convertToGpu(): GpuPartitioning =
GpuHashPartitioning(childExprs.map(_.convertToGpu()), hp.numPartitions)
}),
Expand Down Expand Up @@ -3817,7 +3830,7 @@ object GpuOverrides extends Logging {
.withPsNote(TypeEnum.STRUCT, "Round-robin partitioning is not supported for nested " +
s"structs if ${SQLConf.SORT_BEFORE_REPARTITION.key} is true")
.withPsNote(
Seq(TypeEnum.ARRAY, TypeEnum.MAP),
Seq(TypeEnum.MAP),
"Round-robin partitioning is not supported if " +
s"${SQLConf.SORT_BEFORE_REPARTITION.key} is true"),
TypeSig.all),
Expand Down Expand Up @@ -3879,10 +3892,12 @@ object GpuOverrides extends Logging {
"The backend for hash based aggregations",
ExecChecks(
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 +
TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT)
TypeSig.MAP + TypeSig.STRUCT + TypeSig.ARRAY)
.nested()
.withPsNote(Seq(TypeEnum.ARRAY, TypeEnum.MAP),
.withPsNote(TypeEnum.MAP,
"not allowed for grouping expressions")
.withPsNote(TypeEnum.ARRAY,
"not allowed for grouping expressions if containing Struct as child")
.withPsNote(TypeEnum.STRUCT,
"not allowed for grouping expressions if containing Array or Map as child"),
TypeSig.all),
Expand Down
26 changes: 20 additions & 6 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.rapids.{CpuToGpuAggregateBufferConverter, CudfAggregate, GpuAggregateExpression, GpuToCpuAggregateBufferConverter}
import org.apache.spark.sql.rapids.execution.{GpuShuffleMeta, TrampolineUtil}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

object AggregateUtils {
Expand Down Expand Up @@ -847,13 +847,27 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
groupingExpressions ++ aggregateExpressions ++ aggregateAttributes ++ resultExpressions

override def tagPlanForGpu(): Unit = {
// We don't support Arrays and Maps as GroupBy keys yet, even they are nested in Structs. So,
// We don't support Maps as GroupBy keys yet, even if they are nested in Structs. So,
// we need to run recursive type check on the structs.
val arrayOrMapGroupings = agg.groupingExpressions.exists(e =>
val mapGroupings = agg.groupingExpressions.exists(e =>
TrampolineUtil.dataTypeExistsRecursively(e.dataType,
dt => dt.isInstanceOf[ArrayType] || dt.isInstanceOf[MapType]))
if (arrayOrMapGroupings) {
willNotWorkOnGpu("ArrayTypes or MapTypes in grouping expressions are not supported")
dt => dt.isInstanceOf[MapType]))
if (mapGroupings) {
willNotWorkOnGpu("MapTypes in grouping expressions are not supported")
}

// We support Arrays as grouping expression but not if the child is a struct. So we need to
// run recursive type check on the lists of structs
val arrayWithStructsGroupings = agg.groupingExpressions.exists(e =>
TrampolineUtil.dataTypeExistsRecursively(e.dataType,
dt => dt match {
case ArrayType(_: StructType, _) => true
case _ => false
})
)
if (arrayWithStructsGroupings) {
willNotWorkOnGpu("ArrayTypes with Struct children in grouping expressions are not " +
"supported")
}

tagForReplaceMode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ case class GpuBasicMin(child: Expression) extends GpuMin(child)
*/
case class GpuFloatMin(child: Expression) extends GpuMin(child)
with GpuReplaceWindowFunction {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure to cleanup all files.

override val dataType: DataType = child.dataType match {
case FloatType | DoubleType => child.dataType
case t => throw new IllegalStateException(s"child type $t is not FloatType or DoubleType")
Expand Down Expand Up @@ -606,7 +606,7 @@ case class GpuFloatMin(child: Expression) extends GpuMin(child)
// Else return the min value
override lazy val postUpdate: Seq[Expression] = Seq(
GpuIf(
updateAllNansOrNulls.attr,
updateAllNansOrNulls.attr,
razajafri marked this conversation as resolved.
Show resolved Hide resolved
GpuIf(
updateHasNan.attr, GpuLiteral(nan, dataType), GpuLiteral(null, dataType)
),
Expand Down Expand Up @@ -668,7 +668,7 @@ object GpuMax {
abstract class GpuMax(child: Expression) extends GpuAggregateFunction
with GpuBatchedRunningWindowWithFixer
with GpuAggregateWindowFunction
with GpuRunningWindowFunction
with GpuRunningWindowFunction
razajafri marked this conversation as resolved.
Show resolved Hide resolved
with Serializable {
override lazy val initialValues: Seq[GpuLiteral] = Seq(GpuLiteral(null, child.dataType))
override lazy val inputProjection: Seq[Expression] = Seq(child)
Expand Down Expand Up @@ -730,7 +730,7 @@ case class GpuBasicMax(child: Expression) extends GpuMax(child)
* column `isNan`. If any value in this column is true, return `Nan`,
* Else, return what `GpuBasicMax` returns.
*/
case class GpuFloatMax(child: Expression) extends GpuMax(child)
case class GpuFloatMax(child: Expression) extends GpuMax(child)
with GpuReplaceWindowFunction{

override val dataType: DataType = child.dataType match {
Expand All @@ -756,13 +756,13 @@ case class GpuFloatMax(child: Expression) extends GpuMax(child)
override lazy val updateAggregates: Seq[CudfAggregate] = Seq(updateMaxVal, updateIsNan)
// If there is `Nan` value in the target column, return `Nan`
// else return what the `CudfMax` returns
override lazy val postUpdate: Seq[Expression] =
override lazy val postUpdate: Seq[Expression] =
Seq(
GpuIf(updateIsNan.attr, GpuLiteral(nan, dataType), updateMaxVal.attr)
)

// Same logic as the `inputProjection` stage.
override lazy val preMerge: Seq[Expression] =
override lazy val preMerge: Seq[Expression] =
Seq(evaluateExpression, GpuIsNan(evaluateExpression))
// Same logic as the `updateAggregates` stage.
override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(mergeMaxVal, mergeIsNan)
Expand Down
4 changes: 2 additions & 2 deletions tools/generated_files/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON fr
JsonTuple,S,`json_tuple`,None,project,json,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,S,`json_tuple`,None,project,field,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,S,`json_tuple`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
KnownFloatingPointNormalized,S, ,None,project,input,NA,NA,NA,NA,NA,S,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
KnownFloatingPointNormalized,S, ,None,project,result,NA,NA,NA,NA,NA,S,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
KnownFloatingPointNormalized,S, ,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S
KnownFloatingPointNormalized,S, ,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S
KnownNotNull,S, ,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,NS,S,S,PS,PS,PS,NS
KnownNotNull,S, ,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,NS,S,S,PS,PS,PS,NS
Lag,S,`lag`,None,window,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NS,PS,NS
Expand Down