diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0390131172bb6..ba7c39f9db571 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3843,8 +3843,8 @@ object TimeWindowing extends Rule[LogicalPlan] { * The windows are calculated as below: * maxNumOverlapping <- ceil(windowDuration / slideDuration) * for (i <- 0 until maxNumOverlapping) - * windowId <- ceil((timestamp - startTime) / slideDuration) - * windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime + * lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration + * windowStart <- lastStart - i * slideDuration * windowEnd <- windowStart + windowDuration * return windowStart, windowEnd * @@ -3884,14 +3884,11 @@ object TimeWindowing extends Rule[LogicalPlan] { case _ => Metadata.empty } - def getWindow(i: Int, overlappingWindows: Int, dataType: DataType): Expression = { - val division = (PreciseTimestampConversion( - window.timeColumn, dataType, LongType) - window.startTime) / window.slideDuration - val ceil = Ceil(division) - // if the division is equal to the ceiling, our record is the start of a window - val windowId = CaseWhen(Seq((ceil === division, ceil + 1)), Some(ceil)) - val windowStart = (windowId + i - overlappingWindows) * - window.slideDuration + window.startTime + def getWindow(i: Int, dataType: DataType): Expression = { + val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType) + val lastStart = timestamp - (timestamp - window.startTime + + window.slideDuration) % window.slideDuration + val windowStart = lastStart - i * window.slideDuration val windowEnd = windowStart + window.windowDuration CreateNamedStruct( @@ -3906,7 +3903,7 @@ object TimeWindowing extends Rule[LogicalPlan] { WINDOW_COL_NAME, window.dataType, metadata = metadata)() if (window.windowDuration == window.slideDuration) { - val windowStruct = Alias(getWindow(0, 1, window.timeColumn.dataType), WINDOW_COL_NAME)( + val windowStruct = Alias(getWindow(0, window.timeColumn.dataType), WINDOW_COL_NAME)( exprId = windowAttr.exprId, explicitMetadata = Some(metadata)) val replacedPlan = p transformExpressions { @@ -3924,7 +3921,7 @@ object TimeWindowing extends Rule[LogicalPlan] { math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt val windows = Seq.tabulate(overlappingWindows)(i => - getWindow(i, overlappingWindows, window.timeColumn.dataType)) + getWindow(i, window.timeColumn.dataType)) val projections = windows.map(_ +: child.output)