-
Notifications
You must be signed in to change notification settings - Fork 812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pubsub: Make batch request results independent #3457
Conversation
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
pubsub/pubsub.go
Outdated
for { | ||
select { | ||
case msgs, ok := <-resultChannel.msgs: | ||
if !ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this lose errors?
I.e., if getNextBatch
gets an error, and writes it to resultChannel.err, and then immediately closes resultChannel.msgs, I think this select
could end up here, and we'd never receive the error.
Instead of returning a struct that contains two channels, I think it might be better to return a single channel, which delivers structs (where the struct can either have messages or an error).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there might be a bit more to it...
So, given that g.Wait
in getNextBatch
will return only after each goroutine has exited, it means that any messages sent, will certainly happen before Wait
returns.
So when Wait
returns, if there was an error it will be written to the error channel which is not buffered . So, this will block the execution until it is consumed. So it won't lose any errors since it won't move to close the channel unless the error gets consumed.
But you may be right about the flow being simpler returning a single channel. I will try this out
@vangent Thank you for your suggestions! I have refactored the execution flow to address them. This is what you get when coding tired after midnight 😄 .. Can you please take an other look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't taken another look, but the tests are still failing with a data race.
You should be able to reproduce this locally with -race
....
WARNING: DATA RACE
Read at 0x00c0000bc6b8 by goroutine 2559:
gocloud.dev/pubsub.(*secondReceiveBlockedDriverSub).ReceiveBatch()
/Users/runner/work/go-cloud/go-cloud/pubsub/pubsub_test.go:291 +0x38
gocloud.dev/pubsub.(*Subscription).getNextBatch.func1.1()
/Users/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:674 +0x128
gocloud.dev/internal/retry.call()
/Users/runner/work/go-cloud/go-cloud/internal/retry/retry.go:52 +0x10c
gocloud.dev/internal/retry.Call()
/Users/runner/work/go-cloud/go-cloud/internal/retry/retry.go:40 +0x140
gocloud.dev/pubsub.(*Subscription).getNextBatch.func1()
/Users/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:670 +0x58
golang.org/x/sync/errgroup.(*Group).Go.func1()
/Users/runner/go/pkg/mod/golang.org/x/sync@v0.7.0/errgroup/errgroup.go:78 +0x7c
Previous write at 0x00c0000bc6b8 by goroutine 2560:
gocloud.dev/pubsub.(*secondReceiveBlockedDriverSub).ReceiveBatch()
/Users/runner/work/go-cloud/go-cloud/pubsub/pubsub_test.go:291 +0x4c
gocloud.dev/pubsub.(*Subscription).getNextBatch.func1.1()
/Users/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:674 +0x128
gocloud.dev/internal/retry.call()
/Users/runner/work/go-cloud/go-cloud/internal/retry/retry.go:52 +0x10c
gocloud.dev/internal/retry.Call()
/Users/runner/work/go-cloud/go-cloud/internal/retry/retry.go:40 +0x140
gocloud.dev/pubsub.(*Subscription).getNextBatch.func1()
/Users/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:670 +0x58
golang.org/x/sync/errgroup.(*Group).Go.func1()
/Users/runner/go/pkg/mod/golang.org/x/sync@v0.7.0/errgroup/errgroup.go:78 +0x7c
@vangent the race condition was on test code now... I had forgotten to save the |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #3457 +/- ##
==========================================
+ Coverage 73.38% 73.42% +0.04%
==========================================
Files 113 113
Lines 14959 14964 +5
==========================================
+ Hits 10977 10987 +10
+ Misses 3208 3204 -4
+ Partials 774 773 -1 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pubsub.go looks good now, just a nit on the test. Thanks!
pubsub/pubsub_test.go
Outdated
) | ||
// set the false calculated subscription batch size to force 2 batches to be called | ||
s.runningBatchSize = 2 | ||
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded timings might be racy.
Could secondReceiveBlockedDriverSub
just wait forever (or maybe until the passed-in ctx
is Done
) on the second call, and s.Receive
wait forever (no context.WithTimeout
)? Without your change, the test would hang and eventually timeout; after it, I think the receive should work ~right away (getting the first message) and then exit successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I opted in for context timeout because in the case of failure, I didn't want the test to hang for a long amount of time and get a faster failure.
Anyway, I changed it to how you proposed with the wait-forever approach and made sure that on master the test hangs.
This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [gocloud.dev](https://redirect.github.com/google/go-cloud) | `v0.37.0` -> `v0.39.0` | [![age](https://developer.mend.io/api/mc/badges/age/go/gocloud.dev/v0.39.0?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://developer.mend.io/api/mc/badges/adoption/go/gocloud.dev/v0.39.0?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://developer.mend.io/api/mc/badges/compatibility/go/gocloud.dev/v0.37.0/v0.39.0?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://developer.mend.io/api/mc/badges/confidence/go/gocloud.dev/v0.37.0/v0.39.0?slim=true)](https://docs.renovatebot.com/merge-confidence/) | --- ### Release Notes <details> <summary>google/go-cloud (gocloud.dev)</summary> ### [`v0.39.0`](https://redirect.github.com/google/go-cloud/releases/tag/v0.39.0) [Compare Source](https://redirect.github.com/google/go-cloud/compare/v0.38.0...v0.39.0) #### BREAKING CHANGE (AWS only, V1 vs V2 SDK) Context: AWS has [announced maintenance mode](https://aws.amazon.com/blogs/developer/announcing-end-of-support-for-aws-sdk-for-go-v1-on-july-31-2025/) for the Go V1 SDK. Go CDK has changed the default SDK for URLs across all modules except `docstore/awsdynamodb` to be V2 (previously you needed to add `awssdk=v2` to the URL to get V2). Most URLs should continue to work, but in some cases you may need to add `awssdk=v1` to force V1 explicitly. Also, concrete type constructors (e.g., `OpenBucket`) for V1 (again, except `docstore/awsdynamodb`) have been marked deprecated; please migrate to using the V2 versions (e.g., `OpenBucketV2`). Our tentative plan is to remove support for V1 in early 2025; please [file a bug](https://redirect.github.com/google/go-cloud/issues/new/choose) if you have concerns about that. #### What's Changed - pubsub: Make batch request results independent by [@​mitsos1os](https://redirect.github.com/mitsos1os) in [https://github.com/google/go-cloud/pull/3457](https://redirect.github.com/google/go-cloud/pull/3457) - docstore/all: Add support for boolean filter by [@​ybourgery](https://redirect.github.com/ybourgery) in [https://github.com/google/go-cloud/pull/3464](https://redirect.github.com/google/go-cloud/pull/3464) - aws/all: Mark V1 constructors deprecated. by [@​vangent](https://redirect.github.com/vangent) in [https://github.com/google/go-cloud/pull/3466](https://redirect.github.com/google/go-cloud/pull/3466) - aws/all: Change the default for AWS URLs from V1 to V2. by [@​vangent](https://redirect.github.com/vangent) in [https://github.com/google/go-cloud/pull/3465](https://redirect.github.com/google/go-cloud/pull/3465) - all: update to go version 1.23 by [@​vangent](https://redirect.github.com/vangent) in [https://github.com/google/go-cloud/pull/3467](https://redirect.github.com/google/go-cloud/pull/3467) #### New Contributors - [@​mitsos1os](https://redirect.github.com/mitsos1os) made their first contribution in [https://github.com/google/go-cloud/pull/3457](https://redirect.github.com/google/go-cloud/pull/3457) - [@​dependabot](https://redirect.github.com/dependabot) made their first contribution in [https://github.com/google/go-cloud/pull/3448](https://redirect.github.com/google/go-cloud/pull/3448) **Full Changelog**: google/go-cloud@v0.38.0...v0.39.0 ### [`v0.38.0`](https://redirect.github.com/google/go-cloud/releases/tag/v0.38.0) [Compare Source](https://redirect.github.com/google/go-cloud/compare/v0.37.0...v0.38.0) **blob** - **all**: Fix panics if reader recreation fails after Seek by [@​vangent](https://redirect.github.com/vangent) in [https://github.com/google/go-cloud/pull/3425](https://redirect.github.com/google/go-cloud/pull/3425) - **all**: Convert errors in `Open()` into appropriate fs errors by [@​milescrabill](https://redirect.github.com/milescrabill) in [https://github.com/google/go-cloud/pull/3443](https://redirect.github.com/google/go-cloud/pull/3443) - **s3blob**: Fix Copy to work with keys that need escaping by [@​vangent](https://redirect.github.com/vangent) in [https://github.com/google/go-cloud/pull/3403](https://redirect.github.com/google/go-cloud/pull/3403) - **azureblob**: Do not panic if Content-Length and Content-Range are missing by [@​chancez](https://redirect.github.com/chancez) in [https://github.com/google/go-cloud/pull/3445](https://redirect.github.com/google/go-cloud/pull/3445) - **fileblob**: Allow customization of the FileMode by [@​vangent](https://redirect.github.com/vangent) in [https://github.com/google/go-cloud/pull/3426](https://redirect.github.com/google/go-cloud/pull/3426) **pubsub** - **awssnssqs**: Add support for setting FIFO message metadata by [@​bartventer](https://redirect.github.com/bartventer) in [https://github.com/google/go-cloud/pull/3435](https://redirect.github.com/google/go-cloud/pull/3435) - **kafkapubsub**: Configuring key_name when OpenTopicURL by [@​ssetin](https://redirect.github.com/ssetin) in [https://github.com/google/go-cloud/pull/3404](https://redirect.github.com/google/go-cloud/pull/3404) - **rabbitpubsub**: Add query string set the qos prefetch count by [@​peczenyj](https://redirect.github.com/peczenyj) in [https://github.com/google/go-cloud/pull/3431](https://redirect.github.com/google/go-cloud/pull/3431) - **rabbitpubsub**: Add query string to set the routing key from metadata by [@​peczenyj](https://redirect.github.com/peczenyj) in [https://github.com/google/go-cloud/pull/3433](https://redirect.github.com/google/go-cloud/pull/3433) - **rabbitpubsub**: Wrap pubsub rabbitmq errors by [@​peczenyj](https://redirect.github.com/peczenyj) in [https://github.com/google/go-cloud/pull/3437](https://redirect.github.com/google/go-cloud/pull/3437) **docstore** - **all**: Fix offset handling and extend test coverage by [@​bartventer](https://redirect.github.com/bartventer) in [https://github.com/google/go-cloud/pull/3409](https://redirect.github.com/google/go-cloud/pull/3409) - **awsdynamodb**: Ensure Next returns EOF when no more items by [@​bartventer](https://redirect.github.com/bartventer) in [https://github.com/google/go-cloud/pull/3406](https://redirect.github.com/google/go-cloud/pull/3406) - **mongodocstore**: Update Mongo dialer when MONGO_SERVER_URL rotates by [@​concaf](https://redirect.github.com/concaf) in [https://github.com/google/go-cloud/pull/3429](https://redirect.github.com/google/go-cloud/pull/3429) #### New Contributors - [@​ssetin](https://redirect.github.com/ssetin) made their first contribution in [https://github.com/google/go-cloud/pull/3404](https://redirect.github.com/google/go-cloud/pull/3404) - [@​concaf](https://redirect.github.com/concaf) made their first contribution in [https://github.com/google/go-cloud/pull/3429](https://redirect.github.com/google/go-cloud/pull/3429) - [@​peczenyj](https://redirect.github.com/peczenyj) made their first contribution in [https://github.com/google/go-cloud/pull/3431](https://redirect.github.com/google/go-cloud/pull/3431) - [@​chancez](https://redirect.github.com/chancez) made their first contribution in [https://github.com/google/go-cloud/pull/3445](https://redirect.github.com/google/go-cloud/pull/3445) - [@​milescrabill](https://redirect.github.com/milescrabill) made their first contribution in [https://github.com/google/go-cloud/pull/3443](https://redirect.github.com/google/go-cloud/pull/3443) - [@​samlaf](https://redirect.github.com/samlaf) made their first contribution in [https://github.com/google/go-cloud/pull/3450](https://redirect.github.com/google/go-cloud/pull/3450) **Full Changelog**: google/go-cloud@v0.37.0...v0.38.0 </details> --- ### Configuration 📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Enabled. ♻ **Rebasing**: Whenever PR is behind base branch, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box --- This PR was generated by [Mend Renovate](https://mend.io/renovate/). View the [repository job log](https://developer.mend.io/github/open-feature/flagd). <!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzOC43NC4xIiwidXBkYXRlZEluVmVyIjoiMzguNzQuMSIsInRhcmdldEJyYW5jaCI6Im1haW4iLCJsYWJlbHMiOltdfQ==--> Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Allow batch requests to return results independently without one blocking the other.
This will in turn allow
Subscription.Receive
to return a message from a request (since theReceive
function signature returns only one message), without waiting for all the requests from the batch to complete.Except for this, package behavior should remain as is...
Fixes: #3456