diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java index 3346b3978590b..93c1bf16f6bf1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java @@ -262,6 +262,12 @@ public int compare(TimeWindow o1, TimeWindow o2) { * @return window start */ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { - return timestamp - (timestamp - offset + windowSize) % windowSize; + final long remainder = (timestamp - offset) % windowSize; + // handle both positive and negative cases + if (remainder < 0) { + return timestamp - (remainder + windowSize); + } else { + return timestamp - remainder; + } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java index 57a4367851496..0be5ed7a8c55b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java @@ -29,23 +29,38 @@ public class TimeWindowTest { @Test public void testGetWindowStartWithOffset() { - // [0, 7), [7, 14), [14, 21)... + // [-21, -14), [-14, -7), [-7, 0), [0, 7), [7, 14), [14, 21)... long offset = 0; + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-8, offset, 7), -14); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-7, offset, 7), -7); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-6, offset, 7), -7); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-1, offset, 7), -7); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), 0); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6, offset, 7), 0); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7, offset, 7), 7); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8, offset, 7), 7); - // [-4, 3), [3, 10), [10, 17)... + // [-11, -4), [-4, 3), [3, 10), [10, 17)... offset = 3; + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-10, offset, 7), -11); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-9, offset, 7), -11); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-3, offset, 7), -4); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2, offset, 7), -4); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-1, offset, 7), -4); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -4); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2, offset, 7), -4); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), 3); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9, offset, 7), 3); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10, offset, 7), 10); - // [-2, 5), [5, 12), [12, 19)... + // [-16, -9), [-9, -2), [-2, 5), [5, 12), [12, 19)... offset = -2; + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-12, offset, 7), -16); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-7, offset, 7), -9); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-4, offset, 7), -9); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-3, offset, 7), -9); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2, offset, 7), -2); + Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-1, offset, 7), -2); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -2); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2, offset, 7), -2); Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), -2); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java index e358c994d8ea4..11e2ed3fd1887 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java @@ -220,7 +220,13 @@ public TimeWindowSerializerSnapshot() { * @return window start */ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { - return timestamp - (timestamp - offset + windowSize) % windowSize; + final long remainder = (timestamp - offset) % windowSize; + // handle both positive and negative cases + if (remainder < 0) { + return timestamp - (remainder + windowSize); + } else { + return timestamp - remainder; + } } public static TimeWindow of(long start, long end) {