Skip to content

Conversation

@realdengziqi
Copy link
Contributor

Co-authored-by: Lin WanNi linwanniderz@foxmail.com
Co-authored-by: Guo YuanFang 1650213825@qq.com

What is the purpose of the change

https://issues.apache.org/jira/browse/FLINK-26334
about: #18982
The goal of this PR is to fix the bug that: the element couldn't be assigned to the correct window-start, if it's timestamp - offset + windowSize < 0.

This bug located at org.apache.flink.streaming.api.windowing.windows.TimeWindow .

This problem will be triggered by the negative timestamp, and is caused by the calculation method of remainder in the JAVA compiler.

Specifically, when we try to calculate the window-start of an incoming element, if timestamp - offset + windowSize < 0, based on the current calculation formula for window-start, the element will be right shifted to the next window, which has a start time larger than the timestamp of current element, seems violated the assignment principle for elements on window.

image

This problem can be fixed by modifying the calculation formula inside the getWindowStartWithOffset() method as below:

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    return timestamp
            - (timestamp - offset) % windowSize
            - (windowSize & (timestamp - offset) >> 63);
}

After this modify, for the element who has negative timestamp, we can still get the correct window-start. Like the below graph showing:
image

The getWindowStartWithOffset() method in other package

Considered the common usage of this method, we checked out the other getWindowStartWithOffset() method in the project, found one in the org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping

Turn out this method correctly handled the negative timestamp situation. Below is the source code.

private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        long remainder = (timestamp - offset) % windowSize;
        // handle both positive and negative cases
        if (remainder < 0) {
            return timestamp - (remainder + windowSize);
        } else {
            return timestamp - remainder;
        }
    }

further

When we wrote the test case, we found that the algorithm we wrote would violate the convention that the window is closed on the left and open on the right. In addition, considering the readability of the code, we decided to use the same code as in org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping.

private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    long remainder = (timestamp - offset) % windowSize;
    // handle both positive and negative cases
    if (remainder < 0){
         return timestamp - (remainder + windowSize);
     }else {
         return timestamp - remainder;
     }
} 

In addition, in the process of modification, we found that the algorithm of getWindowStartWithOffset in org.apache.flink.table.runtime.operators.window.TimeWindow is the same as that in org.apache.flink.streaming.api.windowing.windows.TimeWindow. So it should cause the same problem. I think it should also be modified to support negative timestamps

Brief change log

  • Fix getWindowStartWithOffset in TimeWindow.java

Verifying this change

This change is already covered by existing tests, such as the tests in the flink-streaming-java [mvn clean verify]

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

…set method in org.apache.flink.table.runtime.operators.window.TimeWindow and org.apache.flink.streaming.api.windowing.windows.TimeWindow . Added test cases in unit test TimeWindowTest.

Co-authored-by: Lin WanNi <linwanniderz@foxmail.com>
Co-authored-by: Guo YuanFang <1650213825@qq.com>
@flinkbot
Copy link
Collaborator

flinkbot commented Mar 18, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@realdengziqi
Copy link
Contributor Author

@fapaul hi , paul. This is a backport PR for the release-1.14 branch. will u help have a look and merge it if it's ok?

@fapaul fapaul merged commit fc86673 into apache:release-1.14 Mar 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants