-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-3489] add PubSub messageId in PubsubMessage #8370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
R: @lukecwik |
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the encoding of the coder will break pipeline update since the encoding used before is not compatible with the new encoding. You'll want to:
- add a new coder that is like PubsubMessageWithAttributesCoder
- add a method like readMessagesWithAttributes that uses the coders that you created in 1.
Repeat steps 1 and 2 for readMessages if you also want to create a readMessagesWithMessageId
You'll also want to update the implementation in the Dataflow specific PubsubReader here when constructing the PubsubMessage to pass in the message id as well:
Line 124 in fff46e1
| new PubsubMessage( |
a2d1aff to
6f781b2
Compare
|
Thanks - I've added a few changes based on your comment. It's my first PR so sorry for all the questions!
|
| public PubsubMessage(byte[] payload, Map<String, String> attributes) { | ||
| this.message = payload; | ||
| this.attributes = attributes; | ||
| this.messageId = ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should set this to null.
lukecwik
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad to help with the questions.
Your current implementation for PubsubMesageWithMessageIdCoder is correct. You don't need to encode/decode an empty map as there is no point.
Tests would typically go here:
https://github.com/apache/beam/tree/master/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub
You could add an integration test that shows that reading messages with a message id works here:
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java
You could also add unit tests for your coders similar to
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
Since your coder is not saying its deterministic or consistent with equals, you can ignore testing for those. Feel free to add additional unit tests for the other pubsub coders that are missing if you want.
| import org.apache.beam.sdk.coders.StringUtf8Coder; | ||
| import org.apache.beam.sdk.values.TypeDescriptor; | ||
|
|
||
| /** A coder for PubsubMessage including attributes. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update comment to state that you also include the message id.
| import org.apache.beam.sdk.coders.StringUtf8Coder; | ||
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; | ||
|
|
||
| /** A coder for PubsubMessage treating the raw bytes being decoded as the message's payload. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update this comment as well that it contains the message id.
| * Class representing a Pub/Sub message. Each message contains a single message payload and a map of | ||
| * attached attributes. | ||
| * Class representing a Pub/Sub message. Each message contains a single message payload, a map of | ||
| * attached attributes, and an optional messageId. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * attached attributes, and an optional messageId. | |
| * attached attributes, and a message id. |
|
Looking good so far, minor comment updates and some tests and this will be merged. |
|
@thinhha any updates? |
|
Sorry for the delay! I've added some changes based on your comment. I've added an additional integration test, hopefully it will be OK. |
|
Run Java PostCommit |
|
I kicked off the postcommit to validate that the IT you added. |
| signal.signalSuccessWhen( | ||
| messages.getCoder(), | ||
| pubsubMessages -> | ||
| pubsubMessages.stream().noneMatch(m -> Strings.isNullOrEmpty(m.getMessageId())))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the new IT doesn't pass:
https://builds.apache.org/job/beam_PostCommit_Java_PR/169/testReport/junit/org.apache.beam.sdk.io.gcp.pubsub/PubsubReadIT/testReadPubsubMessageId/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the IT passed this time. Could you take another look?
Thanks!
|
Run Java PostCommit |
|
Run Java PostCommit |
|
Run Java PreCommit |
|
Run Java PostCommit |
2 similar comments
|
Run Java PostCommit |
|
Run Java PostCommit |
|
Run Java PostCommit |
|
Run Java PostCommit |
|
Run Java PostCommit |
1 similar comment
|
Run Java PostCommit |
|
Run Java PostCommit |
|
Run Java PreCommit |
|
Run Java PostCommit |
1 similar comment
|
Run Java PostCommit |
e669585 to
8af31b1
Compare
|
Run Java PostCommit |
|
Hi @lukecwik, looks like the non-messageId coders are hard-coded in Line 1088 of This means that while the PubsubMessage is created with the messageId field populated when using Looks like How do you think this case can be handled? |
|
Add another constructor to PubsubUnboundedSource where you pass in a boolean as to whether you need the message id and update the getOuptutCoder to look at this property selecting between the 4 pubsub message coder variants. You'll also want to update Line 1146 in 667bf85
|
...le-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
Show resolved
Hide resolved
|
It turns out that one of the Dataflow services doesn't output the message id which makes this change not work with Dataflow until the service is updated. @udim is working on getting that resolved. |
a2d935c to
c02cdbd
Compare
|
Thanks for the update @lukecwik. Good thing the IT caught this! I've added the changes we discussed above. We can rerun the postCommit tests once the dataflow backend change is complete. |
|
Run Java PostCommit |
|
Run Java PostCommit |
|
All tests are passed. Friendly ping for review @lukecwik. |
|
Thanks a lot for contributing this and dealing with the additional complexity that Dataflow added. |
Make the
recordIdfield fromPubsubClient.IncomingMessageavailable as a new field calledmessageIdinPubsubMessage.Updated the coder for
PubsubMessageto encode and decode themessageId.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.