Skip to content

Conversation

@gemelen
Copy link
Contributor

@gemelen gemelen commented Jul 11, 2022

Add orderingKey from proto PubsubMessage to beam Pubsubmessage.
Add a new coder for a PubsubMessage with all fields.
Propagate this new view of a PubsubMessage to PubsubIO as a new API
call.
Add a unit test for a coder.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

…bsubMessage

Add orderingKey from proto PubsubMessage to beam Pubsubmessage.
Add a new coder for a PubsubMessage with all fields.
Propagate this new view of a PubsubMessage to PubsubIO as a new API
call.
Add a unit test for a coder.
@gemelen
Copy link
Contributor Author

gemelen commented Jul 11, 2022

@gemelen
Copy link
Contributor Author

gemelen commented Jul 11, 2022

@damccorm Hi, could you please suggest a reviewer for this changeset?

@gemelen gemelen marked this pull request as ready for review July 11, 2022 08:26
@gemelen
Copy link
Contributor Author

gemelen commented Jul 11, 2022

Addresses BEAM-13592

@damccorm
Copy link
Contributor

@johnjcasey who is a good reviewer here?

@johnjcasey
Copy link
Contributor

I am, I'll take a look

* PubsubMessage#getAttributeMap() attributes}, along with the {@link PubsubMessage#getMessageId()
* messageId} and {PubsubMessage#getOrderingKey() orderingKey} from PubSub.
*/
public static Read<PubsubMessage> readMessagesWithAllAttributesAndMessageIdAndOrderingKey() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove "all" from the message name to keep it consistent with the above method?
Also, based on the above method, do we need .setNeedsAttributes(true) here?
Finally, should we name the coder PubsubMessageCoder, given that the above method calls it PubsubMessageWithAttributesAndMessageIdCoder?

Copy link
Contributor Author

@gemelen gemelen Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that since this coder grabs all fields of an incoming PubsubMessage then it could be named without specifying all these fields in its name. Accordingly, method would be better named readMessages but this name is already taken by other implementation (that reads only payload of PubsubMessage).

I agree that readMessagesWithAllAttributesAndMessageIdAndOrderingKey then should be named readMessagesWithAttributesAndMessageIdAndOrderingKey, so I'm updating it to this.

Regardint the

Also, based on the above method, do we need .setNeedsAttributes(true) here?

newly added method doesn't include setNeedsAttributes, it's a setNeedsOrderingKey:

public static Read<PubsubMessage> readMessagesWithAttributesAndMessageIdAndOrderingKey() 
{
    return Read.newBuilder()
     .setCoder(PubsubMessageCoder.of())
     .setNeedsOrderingKey(true)
     .build();
}

// A message's payload cannot be null
private static final Coder<byte[]> PAYLOAD_CODER = ByteArrayCoder.of();
// A message's attributes can be null.
private static final Coder<Map<String, String>> ATTRIBUTES_CODER =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the attribute map be null, or can elements within the map be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is identical to other coders, so I assume that attributes field either null or contains something.
My guess that elements could be null too, but this needs to be changed in all PubsubMessage-related coders.

PAYLOAD_CODER.encode(value.getPayload(), outStream);
ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream);
MESSAGE_ID_CODER.encode(value.getMessageId(), outStream);
// TODO(discuss what to do with publish_time field)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What needs to be discussed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to discuss if my approach here is correct:

This changeset aims to add only ordering key property of a PubsubMessage, but it's not possible to omit publish_time since it precedes ordering_key in protobuf schema PubsubMessage and thus in byte stream.

Changeset doesn't expose publish_time and omits it after byte stream deserialization in decode function. Also, in encode function of the relevant coder I use meaningless default instance of protobuf.Timestamp just to add required amount of bytes into serialized form of a PubsubMessage.

So, is it correct way or you could suggest what needs to be done with this field instead?

@gemelen
Copy link
Contributor Author

gemelen commented Jul 18, 2022

@johnjcasey could you please take a look on a changeset once more

.build();
}

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe to support bounded writes, on line 1329 of this file you'll need to add explicit copying of the orderingKey as well (in the output.add(OutgoingMessage.of(... section of processElement

@egalpin
Copy link
Member

egalpin commented Jul 19, 2022

Relates to #21162. I think it may also be worth mentioning that there was a previous email thread[1] about the feature to support subscriptions where ordering is enabled, but Beam itself does not guarantee ordering so this could be a confusing user experience. I may be missing additional context which has developed since that 2-year-old thread though!

Writes with orderingKey should definitely be supported though :-)

[1] https://lists.apache.org/thread/1koqhmhr1tb5572cj3fq5b65dnwbg7fg

@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class PubsubMessageCoder extends CustomCoder<PubsubMessage> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that the other coders, like PubsubMessagePayloadOnlyCoder have specific names indicating the properties of the messages which they are meant to be the Coder for. IMO the name PubsubMessageCoder implies by name that it's a generic coder for pubsub messages. Following the naming convention in place, this class might be named PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder (which is comically long, so I'm not sure which is better haha).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you'll also need to register this in PubsubCoderProviderRegistrar

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which I think actually implies that there will need to be a coder for all possible permutations (payload, messageId, attributes, orderingKey)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for giving this the comically long, specific name to be consistent

@egalpin
Copy link
Member

egalpin commented Jul 20, 2022

I believe IncomingMessage and OutgoingMessage (defined in PubsubClient.java)also need to have their static constructors (of(..) methods, maybe there's a more proper name for these?) updated to also pull in message.getOrderingKey()

@egalpin
Copy link
Member

egalpin commented Jul 20, 2022

At the risk of hijacking... Having support for orderingKey is high-priority for me as well, so I've been playing around with this locally. I can't for the life of me get it to work with dataflow when attempting to publish messages with an ordering key. I did get it to work locally, where I could see (via the GCP UI) published messages had orderingKey assigned. Something must be overriding it in dataflow. Thought it might be worth mentioning.

I also had a lot of trouble with coders and getting the correct coder to be maintained throughout the publish process, specifically the coder would be altered to be PubsubMessageWithAttributesCoder after data went through PubsubMessages.ParsePayloadAsPubsubMessageProto. I'm definitely missing an understanding of how to make proper use of CoderRegistrar

@egalpin
Copy link
Member

egalpin commented Jul 20, 2022

@egalpin
Copy link
Member

egalpin commented Jul 22, 2022

Ok the publish side does in fact work: https://lists.apache.org/thread/c6929ms0bjxtcw9ho4tdb5y3t8wnwnfy

I'd like to include those changes with the changes already added by @gemelen. Any opposition?

@Override
public List<CoderProvider> getCoderProviders() {
return ImmutableList.of(
CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class), PubsubMessageCoder.of()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% confident in the order that is required here, or if order matters. I couldn't find documentation around how coders are resolved when multiple coders are registered for the same class. I tried tracing the code but wasn't able to come up with a definitive answer

TypeDescriptor.of(PubsubMessage.class),
PubsubMessageWithAttributesAndMessageIdCoder.of()));
PubsubMessageWithAttributesAndMessageIdCoder.of()),
CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class), PubsubMessageCoder.of()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this code to be the last in the registrar (rather than first), otherwise there are failures in the tests. @johnjcasey Do you have any further info on coder registrars and how a coder is resolved when multiple coders are registered for the same class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not, sorry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally though, I would be surprised to have multiple coder types for a single type, though I don't know if that is my unfamiliarity with coders or not

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TheNeuralBit I think this is the area where there is lack of clarity. As best as I can tell, here there are multiple coders registered for the same class (PubsubMessage.class). It seems like order matters here, but I'm not sure all the implications. Ex. if I put the new coder first in the registrar, existing pubsub unit tests will fail with coder errors.

Copy link
Member

@TheNeuralBit TheNeuralBit Aug 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I dug into the infrastructure here, and it looks like we will always just use the first coder registered for a given type, so I think registering these other three will be a no-op. In theory a coder could raise CannotProvideCoderException and we'd move on to the next one:

but that's not going to happen here, PubsubMessageWithAttributesCoder will never do that.

I think the actually important change is just adding readMessagesWithAttributesAndMessageIdAndOrderingKey with the setCoder call.

It's worth noting that what we specify here will be used as the default in any PCollection<PubsubMessage> that's not created by one of the PubsubIO helper methods. So we should consider making it the one that will always work. There are implications for breaking pipeline update though if we change the default. My suggestion:

  • Leave this as-is for now (don't add the new coder)
  • We can file an issue to follow-up on changing this

(I gathered some context by looking at #8370, and also spoke to @lukecwik)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the info @TheNeuralBit , this is super helpful context. Sounds like what's here then is not going to break anything, which is the most important.

One aspect that I predict will be confusing for users is that if one attempts to write pubsub messages with ordering key, the PubsubMessageWithAttributesCoder will be used by default unless setCoder is called; in that default case, the orderingKey will be "mysteriously" stripped away by the coder. I suppose we might also need a new write helper method like writeMessagesWithOrderingKeys in addition to writeMessages in order help users to set the correct coder?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #23525 for the follow-up

@gemelen
Copy link
Contributor Author

gemelen commented Aug 18, 2022

@damccorm Could you please help to find a reviewer?

@damccorm
Copy link
Contributor

@johnjcasey mind taking another look?

Copy link
Contributor

@johnjcasey johnjcasey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

TypeDescriptor.of(PubsubMessage.class),
PubsubMessageWithAttributesAndMessageIdCoder.of()));
PubsubMessageWithAttributesAndMessageIdCoder.of()),
CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class), PubsubMessageCoder.of()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not, sorry

TypeDescriptor.of(PubsubMessage.class),
PubsubMessageWithAttributesAndMessageIdCoder.of()));
PubsubMessageWithAttributesAndMessageIdCoder.of()),
CoderProviders.forCoder(TypeDescriptor.of(PubsubMessage.class), PubsubMessageCoder.of()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally though, I would be surprised to have multiple coder types for a single type, though I don't know if that is my unfamiliarity with coders or not

@johnjcasey
Copy link
Contributor

Though it would be good to get a better understanding of coders first. I think @TheNeuralBit has some more experience there

@TheNeuralBit
Copy link
Member

Though it would be good to get a better understanding of coders first. I think @TheNeuralBit has some more experience there

Is there a specific question I can help with?

@egalpin
Copy link
Member

egalpin commented Sep 29, 2022

Friendly bump, anything outstanding here or is this ready to be merged? (cc @johnjcasey @TheNeuralBit )

@TheNeuralBit
Copy link
Member

Friendly bump, anything outstanding here or is this ready to be merged? (cc @johnjcasey @TheNeuralBit )

Apologies. I think this is ready to merge once the merge conflict is resolved.

I will file an issue to track the default coder issue

@egalpin
Copy link
Member

egalpin commented Oct 7, 2022

thanks @TheNeuralBit ! I merged in master + resolved conflicts. For future reference, is merge or rebase preferred here? I opted for merge so that prior commits would not be marked as new for reviewers.

@egalpin egalpin merged commit 1d573e2 into apache:master Oct 11, 2022
@egalpin
Copy link
Member

egalpin commented Oct 11, 2022

Thank you @gemelen for kicking off this high-value addition to PubsubIO! 🎉

@gemelen
Copy link
Contributor Author

gemelen commented Oct 11, 2022

Thank you @gemelen for kicking off this high-value addition to PubsubIO! 🎉

You deserve thank you for that changeset more than anybody else.

@gemelen gemelen deleted the beam-13592-pubsub-java-orderingkey branch October 11, 2022 14:41
@gemelen
Copy link
Contributor Author

gemelen commented Oct 11, 2022

Oh, last thing is to update https://issues.apache.org/jira/browse/BEAM-13592

@TheNeuralBit
Copy link
Member

thanks @TheNeuralBit ! I merged in master + resolved conflicts. For future reference, is merge or rebase preferred here? I opted for merge so that prior commits would not be marked as new for reviewers.

I think this is just up to personal preference :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants