Skip to content

Conversation

@realdengziqi
Copy link
Contributor

@realdengziqi realdengziqi commented Mar 4, 2022

Co-authored-by: Lin WanNi linwanniderz@foxmail.com
Co-authored-by: Guo YuanFang 1650213825@qq.com

What is the purpose of the change

https://issues.apache.org/jira/browse/FLINK-26334
The goal of this PR is to fix the bug that: the element couldn't be assigned to the correct window-start, if it's timestamp - offset + windowSize < 0.

This bug located at org.apache.flink.streaming.api.windowing.windows.TimeWindow .

This problem will be triggered by the negative timestamp, and is caused by the calculation method of remainder in the JAVA compiler.

Specifically, when we try to calculate the window-start of an incoming element, if timestamp - offset + windowSize < 0, based on the current calculation formula for window-start, the element will be right shifted to the next window, which has a start time larger than the timestamp of current element, seems violated the assignment principle for elements on window.

image

This problem can be fixed by modifying the calculation formula inside the getWindowStartWithOffset() method as below:

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    return timestamp
            - (timestamp - offset) % windowSize
            - (windowSize & (timestamp - offset) >> 63);
}

After this modify, for the element who has negative timestamp, we can still get the correct window-start. Like the below graph showing:
image

The getWindowStartWithOffset() method in other package

Considered the common usage of this method, we checked out the other getWindowStartWithOffset() method in the project, found one in the org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping

Turn out this method correctly handled the negative timestamp situation. Below is the source code.

private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        long remainder = (timestamp - offset) % windowSize;
        // handle both positive and negative cases
        if (remainder < 0) {
            return timestamp - (remainder + windowSize);
        } else {
            return timestamp - remainder;
        }
    }

Brief change log

  • Fix getWindowStartWithOffset in TimeWindow.java

Verifying this change

This change is already covered by existing tests, such as the tests in the flink-streaming-java [mvn clean verify]

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 4, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

return timestamp - (timestamp - offset + windowSize) % windowSize;
return timestamp
- (timestamp - offset) % windowSize
- (windowSize & (timestamp - offset) >> 63);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(windowSize & (timestamp - offset) >> 63 hard to understand, better to add more comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing our code, we made a mistake. What we want to change is org.apache.flink.streaming.api.windowing.windows.TimeWindow, not org.apache.flink.table.runtime.operators.window.TimeWindow
In the new commit, we find the class we want to change and modify the unit test.
Please ignore this change.
Thanks.

Copy link
Contributor

@zjuwangg zjuwangg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your efforts, we should add test to verify the change. eg. in TimeWindowUtilTest or some other places

@realdengziqi
Copy link
Contributor Author

@zjuwangg
Thanks for the review, again! We checked the unit-test module, which located at org.apache.flink.streaming.runtime.operators.windowing.TimeWindowTest. Turns out it hasn't cover the test for negative timestamp cases. So we add additional test cases by simply negative the original positive-values only test cases.
During the unit test, we found out that our calculation will make the window exclude the start time, which violated the window assignment principle unfortunately. So, considered the readability and correctness, we borrowed the calculation method inside the org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping method eventually in our code.
Considered the code-reference mentioned above, looking forward to have a further discussions with you.

Copy link
Contributor Author

@realdengziqi realdengziqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code has been changed and tests have been added

return timestamp - (timestamp - offset + windowSize) % windowSize;
return timestamp
- (timestamp - offset) % windowSize
- (windowSize & (timestamp - offset) >> 63);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing our code, we made a mistake. What we want to change is org.apache.flink.streaming.api.windowing.windows.TimeWindow, not org.apache.flink.table.runtime.operators.window.TimeWindow
In the new commit, we find the class we want to change and modify the unit test.
Please ignore this change.
Thanks.

Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-7, offset, 7), -7);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-6, offset, 7), -7);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-1, offset, 7), -7);
// ---
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// ---

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, we have removed this line of comments in the new commit a2fd873

Copy link
Contributor

@zjuwangg zjuwangg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your efforts, the fix and and test looks good to me. But as your guys pointed, if i understand right, the method getWindowStartWithOffset in org.apache.flink.table.runtime.operators.window.TimeWindow would also have the same issue. I would suggest fix it here or open a new jira ticket.

Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-3, offset, 7), -4);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2, offset, 7), -4);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-1, offset, 7), -4);
// ---
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this line was also removed in the new commit a2fd873

@realdengziqi
Copy link
Contributor Author

@zjuwangg Thanks, we addressed this issue in the original jira, and made changes to getWindowStartWithOffset in org.apache.flink.table.runtime.operators.window.TimeWindow in a new commit a2fd873. Looking forward to more discussion.

Copy link
Contributor

@zjuwangg zjuwangg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @MartijnVisser will u help have a look and merge it if it's ok?

@MartijnVisser
Copy link
Contributor

@zjuwangg Unfortunately this is not my expertise, so I can't review and merge it.

@realdengziqi
Copy link
Contributor Author

@fapaul Hi. will u help have a look and merge ir if it's ok? thanks

Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great work. Overall the change looks good me but I left some inline comments.

Please also squash the commits together.

*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
long remainder = (timestamp - offset) % windowSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you haven't introduced this method but what do you think about removing this method in favor of the other TimeWindow#getWindowStartWithOffset. I am afraid that it is easy to miss that we have the same method twice in the code base.

I am also fine to do this as followup if you think it is too much for this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing this code. I guess it involves reorganizing the code, which maybe too much for this pr. If necessary, we are willing to start a new jira and pr to deal with this problem. And looking forward the next pleasure cooperation with you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes no worries this can also be done outside of the PR.

@realdengziqi realdengziqi force-pushed the windowStart-fix branch 2 times, most recently from 74f16ec to 3559fb4 Compare March 17, 2022 21:32
@fapaul
Copy link
Contributor

fapaul commented Mar 18, 2022

@realdengziqi it would be good to resolve the conflicts first and rebase with the latest master branch. In the Flink community we always use rebase over merge. Can you also create backport PRs for the release-1.14 and release-1.15 branch?

… org.apache.flink.table.runtime.operators.window.TimeWindow and org.apache.flink.streaming.api.windowing.windows.TimeWindow . Added test cases in unit test TimeWindowTest.

Co-authored-by: Lin WanNi <linwanniderz@foxmail.com>
Co-authored-by: Guo YuanFang <1650213825@qq.com>
@realdengziqi
Copy link
Contributor Author

@fapaul Thanks for the guidance, we rebased it to the latest master branch of the community and force pushed it recently. We also created backport PRs for the release1.14 branch and the release1.15 branch. And, these PRs have successfully passed the azure build.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants