Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,6 @@ private[window] final class SlidingWindowFunctionFrame(
override def write(index: Int, current: InternalRow): Unit = {
var bufferUpdated = index == 0

// Add all rows to the buffer for which the input row value is equal to or less than
// the output row upper bound.
while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
buffer.add(nextRow.copy())
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
inputHighIndex += 1
bufferUpdated = true
}

// Drop all rows from the buffer for which the input row value is smaller than
// the output row lower bound.
while (!buffer.isEmpty && lbound.compare(buffer.peek(), inputLowIndex, current, index) < 0) {
Expand All @@ -212,6 +203,19 @@ private[window] final class SlidingWindowFunctionFrame(
bufferUpdated = true
}

// Add all rows to the buffer for which the input row value is equal to or less than
// the output row upper bound.
while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

     while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
       if (lbound.compare(nextRow, inputLowIndex, current, index) < 0) {
         inputLowIndex += 1
       } else {
         buffer.add(nextRow.copy())
         bufferUpdated = true
       }
       nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
       inputHighIndex += 1
     }

?

if (lbound.compare(nextRow, inputLowIndex, current, index) < 0) {
inputLowIndex += 1
} else {
buffer.add(nextRow.copy())
bufferUpdated = true
}
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
inputHighIndex += 1
}

// Only recalculate and update when the buffer changes.
if (bufferUpdated) {
processor.initialize(input.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,46 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext {
spark.catalog.dropTempView("nums")
}

test("window function: mutiple window expressions specified by range in a single expression") {
val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
nums.createOrReplaceTempView("nums")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap your test with withTempView, which can drop the view automatically.

Copy link
Contributor

@cloud-fan cloud-fan Jul 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this test is not very related to this PR, just improves test coverage for range window frame.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this test case doesn't cover when CurrentRow is not in the window frame. We'd better add that senario.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will add it later today.

withTempView("nums") {
val expected =
Row(1, 1, 1, 4, null, 8, 25) ::
Row(1, 3, 4, 9, 1, 12, 24) ::
Row(1, 5, 9, 15, 4, 16, 21) ::
Row(1, 7, 16, 21, 8, 9, 16) ::
Row(1, 9, 25, 16, 12, null, 9) ::
Row(0, 2, 2, 6, null, 10, 30) ::
Row(0, 4, 6, 12, 2, 14, 28) ::
Row(0, 6, 12, 18, 6, 18, 24) ::
Row(0, 8, 20, 24, 10, 10, 18) ::
Row(0, 10, 30, 18, 14, null, 10) ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, please make sure there is no behavior change, i.e. the result should be same with or without this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expected is calculated manually. This test is to verify there is no behavior change.

Nil

val actual = sql(
"""
|SELECT
| y,
| x,
| sum(x) over w1 as history_sum,
| sum(x) over w2 as period_sum1,
| sum(x) over w3 as period_sum2,
| sum(x) over w4 as period_sum3,
| sum(x) over w5 as future_sum
|FROM nums
|WINDOW
| w1 AS (PARTITION BY y ORDER BY x RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
| w2 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING),
| w3 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 4 PRECEDING AND 2 PRECEDING ),
| w4 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 FOLLOWING AND 4 FOLLOWING),
| w5 AS (PARTITION BY y ORDER BY x RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
""".stripMargin
)
checkAnswer(actual, expected)
}
}

test("SPARK-7595: Window will cause resolve failed with self join") {
checkAnswer(sql(
"""
Expand Down