Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support interval.microseconds for range window TimeStampType #2525

Merged
merged 3 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 61 additions & 50 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,26 @@
('b', IntegerGen()),
('c', IntegerGen())]

_grpkey_longs_with_timestamps = [
_grpkey_longs_with_dates = [
('a', RepeatSeqGen(LongGen(), length=2048)),
('b', DateGen(nullable=False, start=date(year=2020, month=1, day=1), end=date(year=2020, month=12, day=31))),
('c', IntegerGen())]

_grpkey_longs_with_nullable_timestamps = [
_grpkey_longs_with_nullable_dates = [
('a', RepeatSeqGen(LongGen(nullable=False), length=20)),
('b', DateGen(nullable=(True, 5.0), start=date(year=2020, month=1, day=1), end=date(year=2020, month=12, day=31))),
('c', IntegerGen())]

_grpkey_longs_with_timestamps = [
('a', RepeatSeqGen(LongGen(), length=2048)),
('b', TimestampGen(nullable=False)),
('c', IntegerGen())]

_grpkey_longs_with_nullable_timestamps = [
('a', RepeatSeqGen(LongGen(nullable=False), length=20)),
('b', TimestampGen(nullable=(True, 5.0))),
('c', IntegerGen())]

_grpkey_longs_with_decimals = [
('a', RepeatSeqGen(LongGen(nullable=False), length=20)),
('b', DecimalGen(precision=18, scale=3, nullable=False)),
Expand All @@ -59,19 +69,32 @@

_grpkey_byte_with_nulls = [
('a', RepeatSeqGen(int_gen, length=20)),
('b', ByteGen(nullable=True, min_val=-98, max_val=98, special_cases=[]))]
# restrict the values generated by min_val/max_val not to be overflow when calculating
('b', ByteGen(nullable=True, min_val=-98, max_val=98, special_cases=[])),
('c', IntegerGen())]

_grpkey_short_with_nulls = [
('a', RepeatSeqGen(int_gen, length=20)),
('b', ShortGen(nullable=True, min_val=-32700, max_val=32700, special_cases=[]))]
# restrict the values generated by min_val/max_val not to be overflow when calculating
('b', ShortGen(nullable=True, min_val=-32700, max_val=32700, special_cases=[])),
('c', IntegerGen())]

_grpkey_int_with_nulls = [
('a', RepeatSeqGen(int_gen, length=20)),
('b', IntegerGen(nullable=True, min_val=-2147483000, max_val=2147483000, special_cases=[]))]
# restrict the values generated by min_val/max_val not to be overflow when calculating
('b', IntegerGen(nullable=True, min_val=-2147483000, max_val=2147483000, special_cases=[])),
('c', IntegerGen())]

_grpkey_long_with_nulls = [
('a', RepeatSeqGen(int_gen, length=20)),
('b', LongGen(nullable=True, min_val=-9223372036854775000, max_val=9223372036854775000, special_cases=[]))]
# restrict the values generated by min_val/max_val not to be overflow when calculating
('b', LongGen(nullable=True, min_val=-9223372036854775000, max_val=9223372036854775000, special_cases=[])),
('c', IntegerGen())]

_grpkey_date_with_nulls = [
('a', RepeatSeqGen(int_gen, length=20)),
('b', DateGen(nullable=(True, 5.0), start=date(year=2020, month=1, day=1), end=date(year=2020, month=12, day=31))),
('c', IntegerGen())]

_grpkey_byte_with_nulls_with_overflow = [
('a', IntegerGen()),
Expand Down Expand Up @@ -158,41 +181,42 @@ def test_window_aggs_for_ranges_numeric_long_overflow(data_gen):
_grpkey_byte_with_nulls,
_grpkey_short_with_nulls,
_grpkey_int_with_nulls,
_grpkey_long_with_nulls
_grpkey_long_with_nulls,
_grpkey_date_with_nulls,
], ids=idfn)
def test_window_aggs_for_range_numeric(data_gen, batch_size):
def test_window_aggs_for_range_numeric_date(data_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.window.range.byte.enabled': True,
'spark.rapids.sql.window.range.short.enabled': True}
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, data_gen, length=2048),
"window_agg_table",
'window_agg_table',
'select '
' sum(b) over '
' sum(c) over '
' (partition by a order by b asc '
' range between 1 preceding and 3 following) as sum_c_asc, '
' avg(b) over '
' avg(c) over '
' (partition by a order by b asc '
' range between 1 preceding and 3 following) as avg_b_asc, '
' max(b) over '
' max(c) over '
' (partition by a order by b asc '
' range between 1 preceding and 3 following) as max_b_desc, '
' min(b) over '
' min(c) over '
' (partition by a order by b asc '
' range between 1 preceding and 3 following) as min_b_asc, '
' count(1) over '
' (partition by a order by b asc '
' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, '
' count(b) over '
' count(c) over '
' (partition by a order by b asc '
' range between CURRENT ROW and UNBOUNDED following) as count_b_asc, '
' avg(b) over '
' avg(c) over '
' (partition by a order by b asc '
' range between UNBOUNDED preceding and CURRENT ROW) as avg_b_unbounded, '
' sum(b) over '
' sum(c) over '
' (partition by a order by b asc '
' range between UNBOUNDED preceding and CURRENT ROW) as sum_b_unbounded, '
' max(b) over '
' max(c) over '
' (partition by a order by b asc '
' range between UNBOUNDED preceding and UNBOUNDED following) as max_b_unbounded '
'from window_agg_table ',
Expand All @@ -202,8 +226,8 @@ def test_window_aggs_for_range_numeric(data_gen, batch_size):
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls,
_grpkey_longs_with_nulls,
_grpkey_longs_with_timestamps,
_grpkey_longs_with_nullable_timestamps,
_grpkey_longs_with_dates,
_grpkey_longs_with_nullable_dates,
_grpkey_longs_with_decimals,
_grpkey_longs_with_nullable_decimals,
_grpkey_decimals_with_nulls], ids=idfn)
Expand Down Expand Up @@ -348,62 +372,49 @@ def do_it(spark):


# Test for RANGE queries, with timestamp order-by expressions.
# Non-timestamp order-by columns are currently unsupported for RANGE queries.
# See https://github.com/NVIDIA/spark-rapids/issues/216
@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps,
pytest.param(_grpkey_longs_with_nullable_timestamps)],
ids=idfn)
def test_window_aggs_for_ranges(data_gen):
def test_window_aggs_for_ranges_timestamps(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, data_gen, length=2048),
"window_agg_table",
'select '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 1 day preceding and interval 1 day following) as sum_c_asc, '
' (partition by a order by b asc '
' range between interval 1 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND preceding '
' and interval 1 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND following) as sum_c_asc, '
' avg(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 1 day preceding and interval 1 day following) as avg_c_asc, '
' (partition by a order by b asc '
' range between interval 1 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND preceding '
' and interval 1 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND following) as avg_c_asc, '
' max(c) over '
' (partition by a order by cast(b as timestamp) desc '
' range between interval 2 days preceding and interval 1 days following) as max_c_desc, '
' (partition by a order by b desc '
' range between interval 2 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND preceding '
' and interval 1 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND following) as max_c_desc, '
' min(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 2 days preceding and current row) as min_c_asc, '
' (partition by a order by b asc '
' range between interval 2 DAY 5 HOUR 3 MINUTE 2 SECOND 1 MILLISECOND 5 MICROSECOND preceding '
' and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by cast(b as timestamp) asc '
' (partition by a order by b asc '
' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, '
' count(c) over '
' (partition by a order by cast(b as timestamp) asc '
' (partition by a order by b asc '
' range between CURRENT ROW and UNBOUNDED following) as count_c_asc, '
' avg(c) over '
' (partition by a order by cast(b as timestamp) asc '
' (partition by a order by b asc '
' range between UNBOUNDED preceding and CURRENT ROW) as avg_c_unbounded, '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' (partition by a order by b asc '
' range between UNBOUNDED preceding and CURRENT ROW) as sum_c_unbounded, '
' max(c) over '
' (partition by a order by cast(b as timestamp) asc '
' (partition by a order by b asc '
' range between UNBOUNDED preceding and UNBOUNDED following) as max_c_unbounded '
'from window_agg_table',
conf = {'spark.rapids.sql.castFloatToDecimal.enabled': True})

@pytest.mark.xfail(reason="[UNSUPPORTED] Ranges over non-timestamp columns "
"(https://github.com/NVIDIA/spark-rapids/issues/216)")
@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps], ids=idfn)
def test_window_aggs_for_ranges_of_dates(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, data_gen, length=2048),
"window_agg_table",
'select '
' sum(c) over '
' (partition by a order by b asc '
' range between 1 preceding and 1 following) as sum_c_asc '
'from window_agg_table'
)

_gen_data_for_collect = [
('a', RepeatSeqGen(LongGen(), length=20)),
('b', IntegerGen()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package com.nvidia.spark.rapids

import scala.concurrent.duration.{DAYS, Duration}
import java.util.concurrent.TimeUnit

import scala.language.{existentials, implicitConversions}

import ai.rapids.cudf.{Aggregation, AggregationOnColumn, ColumnVector, DType, RollingAggregation, Scalar, WindowOptions}
Expand Down Expand Up @@ -143,10 +144,10 @@ class GpuWindowExpressionMeta(
Seq(spec.lower, spec.upper).foreach {
case l @ Literal(_, ByteType | ShortType | IntegerType | LongType) =>
checkRangeBoundaryConfig(l.dataType)
case Literal(value, CalendarIntervalType) =>
val ci = value.asInstanceOf[CalendarInterval]
if (ci.months != 0 || ci.microseconds != 0) {
willNotWorkOnGpu("only days are supported for window range intervals")
case Literal(ci: CalendarInterval, CalendarIntervalType) =>
// interval is only working for TimeStampType
if (ci.months != 0) {
willNotWorkOnGpu("interval months isn't supported")
}
case UnboundedFollowing | UnboundedPreceding | CurrentRow =>
case anythings =>
Expand Down Expand Up @@ -416,15 +417,15 @@ object GpuWindowExpression extends Arm {
* @return Scalar holding boundary value
*/
def createRangeWindowBoundary(orderByType: DType, value: Long): Scalar = {
implicit def daysToDuration(days: Long) = Duration(value, DAYS)
orderByType match {
case DType.INT8 => Scalar.fromByte(value.toByte)
case DType.INT16 => Scalar.fromShort(value.toShort)
case DType.INT32 => Scalar.fromInt(value.toInt)
case DType.INT64 => Scalar.fromLong(value)
case DType.TIMESTAMP_DAYS => Scalar.durationFromLong(DType.DURATION_DAYS, value.toDays)
// Interval is not working for DateType
case DType.TIMESTAMP_DAYS => Scalar.durationFromLong(DType.DURATION_DAYS, value)
case DType.TIMESTAMP_MICROSECONDS =>
Scalar.durationFromLong(DType.DURATION_MICROSECONDS, value.toMicros)
Scalar.durationFromLong(DType.DURATION_MICROSECONDS, value)
case _ => throw new RuntimeException(s"Not supported order by type, Found $orderByType")
}
}
Expand All @@ -443,10 +444,10 @@ object GpuWindowExpression extends Arm {
val isUnBounded = special.isUnBounded
(isUnBounded, if (isUnBounded) None else orderByType.map(
createRangeWindowBoundary(_, special.value)))
case GpuLiteral(value, CalendarIntervalType) =>
// TimeStampDays -> DurationDays
var x = value.asInstanceOf[CalendarInterval].days
if (x == Int.MinValue) x = Int.MaxValue
case GpuLiteral(ci: CalendarInterval, CalendarIntervalType) =>
// Get the total microseconds for TIMESTAMP_MICROSECONDS
var x = ci.days * TimeUnit.DAYS.toMicros(1) + ci.microseconds
if (x == Long.MinValue) x = Long.MaxValue
(false, orderByType.map(createRangeWindowBoundary(_, Math.abs(x))))
case GpuLiteral(value, ByteType) =>
var x = value.asInstanceOf[Byte]
Expand Down Expand Up @@ -600,16 +601,24 @@ class GpuSpecifiedWindowFrameMeta(
case Literal(value, ShortType) => value.asInstanceOf[Short].toLong
case Literal(value, IntegerType) => value.asInstanceOf[Int].toLong
case Literal(value, LongType) => value.asInstanceOf[Long]
case Literal(value, CalendarIntervalType) =>
val interval = value.asInstanceOf[CalendarInterval]
if (interval.microseconds != 0 || interval.months != 0) { // DAYS == 0 is permitted.
willNotWorkOnGpu(s"Bounds for Range-based window frames must be specified" +
s" only in DAYS. Found $interval")
case Literal(ci: CalendarInterval, CalendarIntervalType) =>
if (ci.months != 0) {
willNotWorkOnGpu("interval months isn't supported")
}
// return the total microseconds
try {
Math.addExact(
Math.multiplyExact(ci.days.toLong, TimeUnit.DAYS.toMicros(1)),
ci.microseconds)
} catch {
case _: ArithmeticException =>
willNotWorkOnGpu("windows over timestamps are converted to microseconds " +
s"and $ci is too large to fit")
if (isLower) -1 else 1 // not check again
}
interval.days.toLong
case _ =>
willNotWorkOnGpu(s"Bounds for Range-based window frames must be specified in Integral" +
s" type (Boolean exclusive) or DAYS. Found ${bounds.dataType}")
s" type (Boolean exclusive) or CalendarInterval. Found ${bounds.dataType}")
if (isLower) -1 else 1 // not check again
}

Expand Down