diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index eba05a862dd..5a9c6288036 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -31,16 +31,26 @@ ('b', IntegerGen()), ('c', IntegerGen())] -_grpkey_longs_with_timestamps = [ +_grpkey_longs_with_dates = [ ('a', RepeatSeqGen(LongGen(), length=2048)), ('b', DateGen(nullable=False, start=date(year=2020, month=1, day=1), end=date(year=2020, month=12, day=31))), ('c', IntegerGen())] -_grpkey_longs_with_nullable_timestamps = [ +_grpkey_longs_with_nullable_dates = [ ('a', RepeatSeqGen(LongGen(nullable=False), length=20)), ('b', DateGen(nullable=(True, 5.0), start=date(year=2020, month=1, day=1), end=date(year=2020, month=12, day=31))), ('c', IntegerGen())] +_grpkey_longs_with_timestamps = [ + ('a', RepeatSeqGen(LongGen(), length=2048)), + ('b', TimestampGen(nullable=False)), + ('c', IntegerGen())] + +_grpkey_longs_with_nullable_timestamps = [ + ('a', RepeatSeqGen(LongGen(nullable=False), length=20)), + ('b', TimestampGen(nullable=(True, 5.0))), + ('c', IntegerGen())] + _grpkey_longs_with_decimals = [ ('a', RepeatSeqGen(LongGen(nullable=False), length=20)), ('b', DecimalGen(precision=18, scale=3, nullable=False)), @@ -59,19 +69,32 @@ _grpkey_byte_with_nulls = [ ('a', RepeatSeqGen(int_gen, length=20)), - ('b', ByteGen(nullable=True, min_val=-98, max_val=98, special_cases=[]))] + # restrict the values generated by min_val/max_val not to be overflow when calculating + ('b', ByteGen(nullable=True, min_val=-98, max_val=98, special_cases=[])), + ('c', IntegerGen())] _grpkey_short_with_nulls = [ ('a', RepeatSeqGen(int_gen, length=20)), - ('b', ShortGen(nullable=True, min_val=-32700, max_val=32700, special_cases=[]))] + # restrict the values generated by min_val/max_val not to be overflow when calculating + ('b', ShortGen(nullable=True, min_val=-32700, max_val=32700, special_cases=[])), + ('c', IntegerGen())] _grpkey_int_with_nulls = [ ('a', RepeatSeqGen(int_gen, length=20)), - ('b', IntegerGen(nullable=True, min_val=-2147483000, max_val=2147483000, special_cases=[]))] + # restrict the values generated by min_val/max_val not to be overflow when calculating + ('b', IntegerGen(nullable=True, min_val=-2147483000, max_val=2147483000, special_cases=[])), + ('c', IntegerGen())] _grpkey_long_with_nulls = [ ('a', RepeatSeqGen(int_gen, length=20)), - ('b', LongGen(nullable=True, min_val=-9223372036854775000, max_val=9223372036854775000, special_cases=[]))] + # restrict the values generated by min_val/max_val not to be overflow when calculating + ('b', LongGen(nullable=True, min_val=-9223372036854775000, max_val=9223372036854775000, special_cases=[])), + ('c', IntegerGen())] + +_grpkey_date_with_nulls = [ + ('a', RepeatSeqGen(int_gen, length=20)), + ('b', DateGen(nullable=(True, 5.0), start=date(year=2020, month=1, day=1), end=date(year=2020, month=12, day=31))), + ('c', IntegerGen())] _grpkey_byte_with_nulls_with_overflow = [ ('a', IntegerGen()), @@ -158,41 +181,42 @@ def test_window_aggs_for_ranges_numeric_long_overflow(data_gen): _grpkey_byte_with_nulls, _grpkey_short_with_nulls, _grpkey_int_with_nulls, - _grpkey_long_with_nulls + _grpkey_long_with_nulls, + _grpkey_date_with_nulls, ], ids=idfn) -def test_window_aggs_for_range_numeric(data_gen, batch_size): +def test_window_aggs_for_range_numeric_date(data_gen, batch_size): conf = {'spark.rapids.sql.batchSizeBytes': batch_size, 'spark.rapids.sql.window.range.byte.enabled': True, 'spark.rapids.sql.window.range.short.enabled': True} assert_gpu_and_cpu_are_equal_sql( lambda spark: gen_df(spark, data_gen, length=2048), - "window_agg_table", + 'window_agg_table', 'select ' - ' sum(b) over ' + ' sum(c) over ' ' (partition by a order by b asc ' ' range between 1 preceding and 3 following) as sum_c_asc, ' - ' avg(b) over ' + ' avg(c) over ' ' (partition by a order by b asc ' ' range between 1 preceding and 3 following) as avg_b_asc, ' - ' max(b) over ' + ' max(c) over ' ' (partition by a order by b asc ' ' range between 1 preceding and 3 following) as max_b_desc, ' - ' min(b) over ' + ' min(c) over ' ' (partition by a order by b asc ' ' range between 1 preceding and 3 following) as min_b_asc, ' ' count(1) over ' ' (partition by a order by b asc ' ' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, ' - ' count(b) over ' + ' count(c) over ' ' (partition by a order by b asc ' ' range between CURRENT ROW and UNBOUNDED following) as count_b_asc, ' - ' avg(b) over ' + ' avg(c) over ' ' (partition by a order by b asc ' ' range between UNBOUNDED preceding and CURRENT ROW) as avg_b_unbounded, ' - ' sum(b) over ' + ' sum(c) over ' ' (partition by a order by b asc ' ' range between UNBOUNDED preceding and CURRENT ROW) as sum_b_unbounded, ' - ' max(b) over ' + ' max(c) over ' ' (partition by a order by b asc ' ' range between UNBOUNDED preceding and UNBOUNDED following) as max_b_unbounded ' 'from window_agg_table ', @@ -202,8 +226,8 @@ def test_window_aggs_for_range_numeric(data_gen, batch_size): @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches @pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls, _grpkey_longs_with_nulls, - _grpkey_longs_with_timestamps, - _grpkey_longs_with_nullable_timestamps, + _grpkey_longs_with_dates, + _grpkey_longs_with_nullable_dates, _grpkey_longs_with_decimals, _grpkey_longs_with_nullable_decimals, _grpkey_decimals_with_nulls], ids=idfn) @@ -348,62 +372,49 @@ def do_it(spark): # Test for RANGE queries, with timestamp order-by expressions. -# Non-timestamp order-by columns are currently unsupported for RANGE queries. -# See https://github.com/NVIDIA/spark-rapids/issues/216 @ignore_order @pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps, pytest.param(_grpkey_longs_with_nullable_timestamps)], ids=idfn) -def test_window_aggs_for_ranges(data_gen): +def test_window_aggs_for_ranges_timestamps(data_gen): assert_gpu_and_cpu_are_equal_sql( lambda spark: gen_df(spark, data_gen, length=2048), "window_agg_table", 'select ' ' sum(c) over ' - ' (partition by a order by cast(b as timestamp) asc ' - ' range between interval 1 day preceding and interval 1 day following) as sum_c_asc, ' + ' (partition by a order by b asc ' + ' range between interval 1 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND preceding ' + ' and interval 1 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND following) as sum_c_asc, ' ' avg(c) over ' - ' (partition by a order by cast(b as timestamp) asc ' - ' range between interval 1 day preceding and interval 1 day following) as avg_c_asc, ' + ' (partition by a order by b asc ' + ' range between interval 1 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND preceding ' + ' and interval 1 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND following) as avg_c_asc, ' ' max(c) over ' - ' (partition by a order by cast(b as timestamp) desc ' - ' range between interval 2 days preceding and interval 1 days following) as max_c_desc, ' + ' (partition by a order by b desc ' + ' range between interval 2 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND preceding ' + ' and interval 1 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND following) as max_c_desc, ' ' min(c) over ' - ' (partition by a order by cast(b as timestamp) asc ' - ' range between interval 2 days preceding and current row) as min_c_asc, ' + ' (partition by a order by b asc ' + ' range between interval 2 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND preceding ' + ' and current row) as min_c_asc, ' ' count(1) over ' - ' (partition by a order by cast(b as timestamp) asc ' + ' (partition by a order by b asc ' ' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, ' ' count(c) over ' - ' (partition by a order by cast(b as timestamp) asc ' + ' (partition by a order by b asc ' ' range between CURRENT ROW and UNBOUNDED following) as count_c_asc, ' ' avg(c) over ' - ' (partition by a order by cast(b as timestamp) asc ' + ' (partition by a order by b asc ' ' range between UNBOUNDED preceding and CURRENT ROW) as avg_c_unbounded, ' ' sum(c) over ' - ' (partition by a order by cast(b as timestamp) asc ' + ' (partition by a order by b asc ' ' range between UNBOUNDED preceding and CURRENT ROW) as sum_c_unbounded, ' ' max(c) over ' - ' (partition by a order by cast(b as timestamp) asc ' + ' (partition by a order by b asc ' ' range between UNBOUNDED preceding and UNBOUNDED following) as max_c_unbounded ' 'from window_agg_table', conf = {'spark.rapids.sql.castFloatToDecimal.enabled': True}) -@pytest.mark.xfail(reason="[UNSUPPORTED] Ranges over non-timestamp columns " - "(https://github.com/NVIDIA/spark-rapids/issues/216)") -@ignore_order -@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps], ids=idfn) -def test_window_aggs_for_ranges_of_dates(data_gen): - assert_gpu_and_cpu_are_equal_sql( - lambda spark: gen_df(spark, data_gen, length=2048), - "window_agg_table", - 'select ' - ' sum(c) over ' - ' (partition by a order by b asc ' - ' range between 1 preceding and 1 following) as sum_c_asc ' - 'from window_agg_table' - ) - _gen_data_for_collect = [ ('a', RepeatSeqGen(LongGen(), length=20)), ('b', IntegerGen()), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 142bfdc0a58..a308a13453b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -16,7 +16,8 @@ package com.nvidia.spark.rapids -import scala.concurrent.duration.{DAYS, Duration} +import java.util.concurrent.TimeUnit + import scala.language.{existentials, implicitConversions} import ai.rapids.cudf.{Aggregation, AggregationOnColumn, ColumnVector, DType, RollingAggregation, Scalar, WindowOptions} @@ -143,10 +144,10 @@ class GpuWindowExpressionMeta( Seq(spec.lower, spec.upper).foreach { case l @ Literal(_, ByteType | ShortType | IntegerType | LongType) => checkRangeBoundaryConfig(l.dataType) - case Literal(value, CalendarIntervalType) => - val ci = value.asInstanceOf[CalendarInterval] - if (ci.months != 0 || ci.microseconds != 0) { - willNotWorkOnGpu("only days are supported for window range intervals") + case Literal(ci: CalendarInterval, CalendarIntervalType) => + // interval is only working for TimeStampType + if (ci.months != 0) { + willNotWorkOnGpu("interval months isn't supported") } case UnboundedFollowing | UnboundedPreceding | CurrentRow => case anythings => @@ -416,15 +417,15 @@ object GpuWindowExpression extends Arm { * @return Scalar holding boundary value */ def createRangeWindowBoundary(orderByType: DType, value: Long): Scalar = { - implicit def daysToDuration(days: Long) = Duration(value, DAYS) orderByType match { case DType.INT8 => Scalar.fromByte(value.toByte) case DType.INT16 => Scalar.fromShort(value.toShort) case DType.INT32 => Scalar.fromInt(value.toInt) case DType.INT64 => Scalar.fromLong(value) - case DType.TIMESTAMP_DAYS => Scalar.durationFromLong(DType.DURATION_DAYS, value.toDays) + // Interval is not working for DateType + case DType.TIMESTAMP_DAYS => Scalar.durationFromLong(DType.DURATION_DAYS, value) case DType.TIMESTAMP_MICROSECONDS => - Scalar.durationFromLong(DType.DURATION_MICROSECONDS, value.toMicros) + Scalar.durationFromLong(DType.DURATION_MICROSECONDS, value) case _ => throw new RuntimeException(s"Not supported order by type, Found $orderByType") } } @@ -443,10 +444,10 @@ object GpuWindowExpression extends Arm { val isUnBounded = special.isUnBounded (isUnBounded, if (isUnBounded) None else orderByType.map( createRangeWindowBoundary(_, special.value))) - case GpuLiteral(value, CalendarIntervalType) => - // TimeStampDays -> DurationDays - var x = value.asInstanceOf[CalendarInterval].days - if (x == Int.MinValue) x = Int.MaxValue + case GpuLiteral(ci: CalendarInterval, CalendarIntervalType) => + // Get the total microseconds for TIMESTAMP_MICROSECONDS + var x = ci.days * TimeUnit.DAYS.toMicros(1) + ci.microseconds + if (x == Long.MinValue) x = Long.MaxValue (false, orderByType.map(createRangeWindowBoundary(_, Math.abs(x)))) case GpuLiteral(value, ByteType) => var x = value.asInstanceOf[Byte] @@ -600,16 +601,24 @@ class GpuSpecifiedWindowFrameMeta( case Literal(value, ShortType) => value.asInstanceOf[Short].toLong case Literal(value, IntegerType) => value.asInstanceOf[Int].toLong case Literal(value, LongType) => value.asInstanceOf[Long] - case Literal(value, CalendarIntervalType) => - val interval = value.asInstanceOf[CalendarInterval] - if (interval.microseconds != 0 || interval.months != 0) { // DAYS == 0 is permitted. - willNotWorkOnGpu(s"Bounds for Range-based window frames must be specified" + - s" only in DAYS. Found $interval") + case Literal(ci: CalendarInterval, CalendarIntervalType) => + if (ci.months != 0) { + willNotWorkOnGpu("interval months isn't supported") + } + // return the total microseconds + try { + Math.addExact( + Math.multiplyExact(ci.days.toLong, TimeUnit.DAYS.toMicros(1)), + ci.microseconds) + } catch { + case _: ArithmeticException => + willNotWorkOnGpu("windows over timestamps are converted to microseconds " + + s"and $ci is too large to fit") + if (isLower) -1 else 1 // not check again } - interval.days.toLong case _ => willNotWorkOnGpu(s"Bounds for Range-based window frames must be specified in Integral" + - s" type (Boolean exclusive) or DAYS. Found ${bounds.dataType}") + s" type (Boolean exclusive) or CalendarInterval. Found ${bounds.dataType}") if (isLower) -1 else 1 // not check again }