-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add combining state support #22826
Add combining state support #22826
Conversation
Codecov Report
@@ Coverage Diff @@
## master #22826 +/- ##
==========================================
- Coverage 73.93% 73.88% -0.06%
==========================================
Files 713 713
Lines 94151 94318 +167
==========================================
+ Hits 69610 69683 +73
- Misses 23256 23347 +91
- Partials 1285 1288 +3
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Note portable precommit is currently permared due to a licensing issue EDIT - its since healed and the retry worked |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
Run GoPortable PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think my last comment got disappeared when it was a draft PR
acc, ok, err := s.readAccumulator(p) | ||
if !ok || err != nil { | ||
var val T3 | ||
return val, ok, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this function returns an empty list in this if statement, do we expect users to specify type as []int
or just int
and return []T3
from here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't actually return an empty list - it will return a variable set to the null value of T3. So if T3 is an int
, it will return 0
.
The expectation is that Read will always return whatever type ExtractOutput
produces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(or if there's no ExtractOutput
function defined, then whatever MergeAccumulators
produces. This mirrors the behavior of a normal combineFn)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, then I think this should be fine as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This continues to grow our state support by adding bag state support. Once this is in, I'll add a follow up PR with an integration test. In the meantime, I tested manually by running the hacked wordcount found here - https://github.com/damccorm/beam/pull/88/files#diff-46f0bd8f9f6b541e840079960d684ba78e6513f1293b673d08faaec94d48a234 - and it produced output like:
Part of #22736
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.