Skip to content

Conversation

@matt-darwin
Copy link

According to the documentation, system generated metadata should be in the attributes property of the PubSub message, however this is not the case for message_id which is a seperate property of the protobuf PubSub message.

Parse this and add to the attributes dictionary of the beam PubSub message.

Amend tests to expect message_id k,v to be in the attributes property.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [X ] Choose reviewer(s) and mention them in a comment (R: @username).
  • [ X] Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • [ X] If this contribution is large, please file an Apache Individual Contributor License Agreement.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@matt-darwin
Copy link
Author

@aaltay @udim

Copy link
Member

@aaltay aaltay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you are ready, let's also run a test on Dataflow. Since Dataflow and Direct runner has different pubsub implementations.

Matthew Darwin added 3 commits August 3, 2019 13:53
@lukecwik
Copy link
Member

lukecwik commented Aug 5, 2019

FYI, there is #8370 which is attempting to add the message id to the Beam Java SDK.

@udim
Copy link
Member

udim commented Aug 5, 2019

Note that in Dataflow, the Python SDK uses FnAPI to read from Pub/Sub using a Java harness. In other words, the message id might be missing because Java code doesn't provide it somehow. I'll look into it later.

When running locally using DirectRunner, there's a different implementation that replaces ReadFromPubSub entirely, which probably needs modification as well. See:

class _PubSubReadEvaluator(_TransformEvaluator):

@aaltay aaltay requested a review from udim August 6, 2019 00:11
@matt-darwin
Copy link
Author

matt-darwin commented Aug 7, 2019

FYI, there is #8370 which is attempting to add the message id to the Beam Java SDK.

I'll have a look at building the runner from this change to see if I can then access the message_id from the dataflow runner. I'm not adept at all at Java however, so wouldn't be much help there.

@matt-darwin
Copy link
Author

Note that in Dataflow, the Python SDK uses FnAPI to read from Pub/Sub using a Java harness. In other words, the message id might be missing because Java code doesn't provide it somehow. I'll look into it later.

When running locally using DirectRunner, there's a different implementation that replaces ReadFromPubSub entirely, which probably needs modification as well. See:

class _PubSubReadEvaluator(_TransformEvaluator):

The directrunner has been working ok with the above changes; I think the issue is on the dataflow runner side. The directrunner is using the _from_message method, and this is parsing correctly and returning the pubsub message id in my testing so far.

def _read_from_pubsub(self, timestamp_attribute):
   from apache_beam.io.gcp.pubsub import PubsubMessage
   from google.cloud import pubsub

   def _get_element(message):
     parsed_message = PubsubMessage._from_message(message)

@matt-darwin matt-darwin marked this pull request as ready for review August 8, 2019 16:49
@matt-darwin
Copy link
Author

Run Python Dataflow ValidatesRunner

1 similar comment
@matt-darwin
Copy link
Author

Run Python Dataflow ValidatesRunner

@matt-darwin
Copy link
Author

Need to fix up some tests, and the PubSubMessage tests need amending to be in line with those. Haven't had chance to test with the dataflowrunner as yet.

@matt-darwin
Copy link
Author

Ok, so the PubSub timestamp is actually the google.protobuf.timestamp_pb2.Timestamp type. Have amended everything to expect this for the publish_time attribute

@udim
Copy link
Member

udim commented Aug 16, 2019

FYI, support for message_id and publish_time in Dataflow should be available in a future update of Dataflow. Until then, those fields will appear unset or blank.

…t import orders; remove pylint exclusion; correct message_id TODO comment for BEAM7819
@matt-darwin
Copy link
Author

FYI, support for message_id and publish_time in Dataflow should be available in a future update of Dataflow. Until then, those fields will appear unset or blank.

Is that external to the Beam project?

@matt-darwin
Copy link
Author

Run Python Dataflow ValidatesRunner

@udim
Copy link
Member

udim commented Aug 16, 2019

Is that external to the Beam project?

Yes

@matt-darwin
Copy link
Author

Run Python Dataflow ValidatesRunner

@matt-darwin
Copy link
Author

matt-darwin commented Aug 17, 2019

Back to the isort error now:


Running isort...
--
ERROR: /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/io/gcp/pubsub_test.py Imports are incorrectly sorted.
--- /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/io/gcp/pubsub_test.py:before	2019-08-17 06:59:14.752504
+++ /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/io/gcp/pubsub_test.py:after	2019-08-17 07:01:17.704126
@@ -26,6 +26,7 @@
 
import hamcrest as hc
import mock
+from google.protobuf.timestamp_pb2 import Timestamp
 
import apache_beam as beam
from apache_beam.io.gcp.pubsub import PubsubMessage
@@ -52,7 +53,6 @@
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.utils import timestamp
 
-from google.protobuf.timestamp_pb2 import Timestamp
# Protect against environments where the PubSub library is not available.
try:
from google.cloud import pubsub
Command exited with non-zero status 1

@aaltay are you happy for the file to be excluded? If I make the isort recommended change, it then errors due to grouping on the google side.

The issue is if I leave the import in the try block, I get None exceptions all over the place for the timestamp object. Or is this simply because the try block for imports is triggered by a requirement to import the first module in the block and I have them in the wrong order?

@matt-darwin
Copy link
Author

Run Python Dataflow ValidatesRunner

@aaltay
Copy link
Member

aaltay commented Aug 20, 2019

@matt-darwin - Re: isort - Sure, it is fine to exclude it from isort. Thank you for the explanation.

@matt-darwin
Copy link
Author

@aaltay and @udim was there anything further you wanted changing on this?

@matt-darwin
Copy link
Author

Run Python Dataflow ValidatesRunner

Copy link
Member

@aaltay aaltay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you @matt-darwin.

We can merge it once the other reviewers also complete their reviews.

@aaltay
Copy link
Member

aaltay commented Sep 3, 2019

@udim do you have additional comments? Can we merge this?

Copy link
Member

@udim udim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment in run_pylint.sh, otherwise LGTM

@aaltay
Copy link
Member

aaltay commented Sep 4, 2019

The last comment was also addressed. Merging this.

@aaltay aaltay merged commit 0b5beac into apache:master Sep 4, 2019
@udim
Copy link
Member

udim commented Sep 5, 2019

Post-commits are failing, opened: https://issues.apache.org/jira/browse/BEAM-8153

@aaltay
Copy link
Member

aaltay commented Sep 5, 2019

Thank you @udim. Sent you #9479 to revert this PR. We can revert first than address the test issue.

soyrice pushed a commit to soyrice/beam that referenced this pull request Sep 19, 2019
…apache#9232)

[BEAM-7819] Python - parse PubSub message_id into attributes property (apache#9232)
soyrice pushed a commit to soyrice/beam that referenced this pull request Sep 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants