-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for global sequence processing to the "ordered" extension in Java SDK #32540
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @m-trieu for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Reminder, please take a look at this pr: @m-trieu |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @damondouglas for label java. Available commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to try to take this in 2 chunks, starting with the side input pieces.
The code itself looks good so far, thanks for the thorough tests in particular
...ns/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ContiguousSequenceRange.java
Show resolved
Hide resolved
...ns/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ContiguousSequenceRange.java
Show resolved
Hide resolved
...d/src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/DefaultSequenceCombiner.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/SequenceRangeAccumulator.java
Show resolved
Hide resolved
.../src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/SequenceRangeAccumulator.java
Show resolved
Hide resolved
.../src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/SequenceRangeAccumulator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - mostly LGTM outside of pending comments (mostly cosmetic)
...ions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/GlobalSequenceTracker.java
Outdated
Show resolved
Hide resolved
...ions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/GlobalSequenceTracker.java
Outdated
Show resolved
Hide resolved
...dered/src/main/java/org/apache/beam/sdk/extensions/ordered/GlobalSequencesProcessorDoFn.java
Show resolved
Hide resolved
...xtensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java
Outdated
Show resolved
Hide resolved
…/extensions/ordered/combiner/DefaultSequenceCombiner.java Co-authored-by: Danny McCormick <dannymccormick@google.com>
…sequence tracker.
… to trigger the re-evaluation. Fixed accidentally disabled unit tests.
…ateness of the input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM, thanks! I noticed that the java test suite failed, but it looks unrelated to your change (FlinkRequiresStableInputTest
failed). I'm rerunning to hopefully get a green signal before merging
… in Java SDK (apache#32540) * Initial changes to support processing global sequences. * Refactor the DoFns out of the transform and into a class hierarchy. * Next round of implementation of Global Sequence handling. * Added ticker timers in global sequence processing. * Corrected the emission batch logic. * Reworked some tests and fixed the batch output logic. * Pluggable combiner for the global sequence. * First iteration of the efficient merging accumulator * Mostly complete implementation of the accumulator and corresponding tests. * Additional round of test refinements. * Added logic to DQL the records below the global sequence range. * Added providing a global sequence combiner through a handler. * Added SequenceRangeAccumulatorCoder and tests. Improved logic of creating timers. * Fixed logging levels (moved them to "trace") on several transforms. * Round of code improvements and cleanups. * Tests to verify that the the global sequence is correctly produced by the transform. * Added batch processing verification to the global sequence processing. * A round of documentation update and minor clean up. * Fixed the description in CHANGES.md * Polish by "spotless" * Polish by "spotless" * Removed unneeded logging configuration file. * Made ContiguousSequenceRange open ended. * Removed details from 2.60.0 section in CHANGES.md. * Update sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/combiner/DefaultSequenceCombiner.java Co-authored-by: Danny McCormick <dannymccormick@google.com> * Fixed spotless related errors. * Added a note about the new functionality to CHANGES.md * Added clarification around the data structure used in the sequence combiner. * Added clarification around the data structure used in the sequence combiner. * Fixed the problem with allowed lateness being set to 0 in the global sequence tracker. * Parameterized the GlobalSequenceTracker with the max number of events to trigger the re-evaluation. Fixed accidentally disabled unit tests. * Made the event timer used to wait for the event arrival respect the lateness of the input. * Created new failure reason code - "before initial sequence" --------- Co-authored-by: Danny McCormick <dannymccormick@google.com>
Global sequence processing is used to ensure that events for a given key are only processed when it's guaranteed that they all the elements for this particular have been received.
Consider a PCollection which contains these event tuples (first element of the tuple is the global sequence number):
[1, key1, data], [2, key2, data], [3, key1, data], [4, key1, data], [5, key2, data], [7, key2, data]. Elements for key1 must be processed in the following order: 1, 3, 4. Elements for key2 must be processed in the following order: 2, 5. Event with sequence 7 can't be processed because there is a missing sequence 6.
The approach used to implement ordered processing in the presence of global sequencing:
This high level diagram illustrates the overall approach.
There are dedicated unit tests to cover both per-key and global sequence processing. Please refer to them to understand the details of use cases.
Note that the batch unit tests for global processing don't automatically run under global sequencing. This is due to the apparent incorrectness of the DirectRunner implementation (it is supposed to block processing event processing DoFns until the side input is calculated once). The batch processing tests were successfully run manually using DataflowRunner. Additional work will be needed to either fix the DirectRunner, switch to PrismRunner (when it supports all the primitives used in this transform), or enable test to run a DataflowRunner.