diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 0711b70da512..bbf5f898b5e2 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -61,29 +61,43 @@ class PubsubMessage(object): attributes: (dict) Key-value map of str to str, containing both user-defined and service generated attributes (such as id_label and timestamp_attribute). May be None. + message_id: (string) Message_Id as generated by PubSub + publish_time: (timestamp) Published time of message as generated by PubSub + as per google.protobuf.timestamp_pb2.Timestamp + + TODO(BEAM-7819): message_id and publish_time are not populated + on Dataflow runner """ - def __init__(self, data, attributes): + def __init__(self, data, attributes, message_id=None, publish_time=None): if data is None and not attributes: raise ValueError('Either data (%r) or attributes (%r) must be set.', data, attributes) self.data = data self.attributes = attributes + self.message_id = message_id + self.publish_time = publish_time def __hash__(self): - return hash((self.data, frozenset(self.attributes.items()))) + return hash((self.data, frozenset(self.attributes.items()), + self.message_id, self.publish_time.seconds, + self.publish_time.nanos)) def __eq__(self, other): return isinstance(other, PubsubMessage) and ( self.data == other.data and - self.attributes == other.attributes) + self.attributes == other.attributes and + self.message_id == other.message_id and + self.publish_time == other.publish_time) def __ne__(self, other): # TODO(BEAM-5949): Needed for Python 2 compatibility. return not self == other def __repr__(self): - return 'PubsubMessage(%s, %s)' % (self.data, self.attributes) + return 'PubsubMessage(%s, %s, %s, %s)' % (self.data, self.attributes, + self.message_id, + self.publish_time) @staticmethod def _from_proto_str(proto_msg): @@ -100,7 +114,8 @@ def _from_proto_str(proto_msg): msg.ParseFromString(proto_msg) # Convert ScalarMapContainer to dict. attributes = dict((key, msg.attributes[key]) for key in msg.attributes) - return PubsubMessage(msg.data, attributes) + + return PubsubMessage(msg.data, attributes, msg.message_id, msg.publish_time) def _to_proto_str(self): """Get serialized form of ``PubsubMessage``. @@ -127,7 +142,8 @@ def _from_message(msg): """ # Convert ScalarMapContainer to dict. attributes = dict((key, msg.attributes[key]) for key in msg.attributes) - return PubsubMessage(msg.data, attributes) + + return PubsubMessage(msg.data, attributes, msg.message_id, msg.publish_time) class ReadFromPubSub(PTransform): @@ -391,7 +407,6 @@ def __init__(self, topic, id_label, with_attributes, timestamp_attribute): self.id_label = id_label self.with_attributes = with_attributes self.timestamp_attribute = timestamp_attribute - self.project, self.topic_name = parse_topic(topic) @property diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index b1dac60cd8d4..4947b1df821b 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -52,6 +52,7 @@ from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.utils import timestamp +from google.protobuf.timestamp_pb2 import Timestamp # Protect against environments where the PubSub library is not available. try: from google.cloud import pubsub @@ -82,28 +83,55 @@ def test_proto_conversion(self): self.assertEqual(m_converted.attributes, attributes) def test_eq(self): - a = PubsubMessage(b'abc', {1: 2, 3: 4}) - b = PubsubMessage(b'abc', {1: 2, 3: 4}) - c = PubsubMessage(b'abc', {1: 2}) + publish_time = Timestamp(seconds=1520861821, + nanos=234567000) + a = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time) + b = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time) + c = PubsubMessage(b'abc', {1: 2}, '1234567890', publish_time) + d = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234', publish_time) + e = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', + Timestamp(seconds=0, nanos=0)) self.assertTrue(a == b) self.assertTrue(a != c) self.assertTrue(b != c) + self.assertTrue(a != d) + self.assertTrue(b != d) + self.assertTrue(a != e) + self.assertTrue(b != e) def test_hash(self): - a = PubsubMessage(b'abc', {1: 2, 3: 4}) - b = PubsubMessage(b'abc', {1: 2, 3: 4}) - c = PubsubMessage(b'abc', {1: 2}) + publish_time = Timestamp(seconds=1520861821, + nanos=234567000) + a = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time) + b = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time) + c = PubsubMessage(b'abc', {1: 2}, '1234567890', publish_time) + d = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234', publish_time) + e = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', + Timestamp(seconds=0, nanos=0)) self.assertTrue(hash(a) == hash(b)) self.assertTrue(hash(a) != hash(c)) self.assertTrue(hash(b) != hash(c)) + self.assertTrue(hash(a) != hash(d)) + self.assertTrue(hash(b) != hash(d)) + self.assertTrue(hash(a) != hash(e)) + self.assertTrue(hash(b) != hash(e)) def test_repr(self): - a = PubsubMessage(b'abc', {1: 2, 3: 4}) - b = PubsubMessage(b'abc', {1: 2, 3: 4}) - c = PubsubMessage(b'abc', {1: 2}) + publish_time = Timestamp(seconds=1520861821, + nanos=234567000) + a = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time) + b = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time) + c = PubsubMessage(b'abc', {1: 2}, '1234567890', publish_time) + d = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234', publish_time) + e = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', + Timestamp(seconds=0, nanos=0)) self.assertTrue(repr(a) == repr(b)) self.assertTrue(repr(a) != repr(c)) self.assertTrue(repr(b) != repr(c)) + self.assertTrue(repr(a) != repr(d)) + self.assertTrue(repr(b) != repr(d)) + self.assertTrue(repr(a) != repr(e)) + self.assertTrue(repr(b) != repr(e)) @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') @@ -338,12 +366,17 @@ def test_read_messages_success(self, mock_pubsub): publish_time_nanos = 234567000 attributes = {'key': 'value'} ack_id = 'ack_id' + message_id = '0123456789' + publish_time = Timestamp(seconds=publish_time_secs, + nanos=publish_time_nanos) pull_response = test_utils.create_pull_response([ test_utils.PullResponseMessage( - data, attributes, publish_time_secs, publish_time_nanos, ack_id) + data, attributes, message_id, publish_time, + publish_time_secs, publish_time_nanos, ack_id) ]) expected_elements = [ - TestWindowedValue(PubsubMessage(data, attributes), + TestWindowedValue(PubsubMessage(data, attributes, + message_id, publish_time), timestamp.Timestamp(1520861821.234567), [window.GlobalWindow()])] mock_pubsub.return_value.pull.return_value = pull_response @@ -413,13 +446,17 @@ def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub): publish_time_secs = 1520861821 publish_time_nanos = 234567000 ack_id = 'ack_id' + message_id = '0123456789' + publish_time = Timestamp(seconds=publish_time_secs, + nanos=publish_time_nanos) pull_response = test_utils.create_pull_response([ test_utils.PullResponseMessage( - data, attributes, publish_time_secs, publish_time_nanos, ack_id) + data, attributes, message_id, publish_time, + publish_time_secs, publish_time_nanos, ack_id) ]) expected_elements = [ TestWindowedValue( - PubsubMessage(data, attributes), + PubsubMessage(data, attributes, message_id, publish_time), timestamp.Timestamp(micros=int(attributes['time']) * 1000), [window.GlobalWindow()]), ] @@ -446,15 +483,20 @@ def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub): publish_time_secs = 1337000000 publish_time_nanos = 133700000 ack_id = 'ack_id' + message_id = '0123456789' + publish_time = Timestamp(seconds=publish_time_secs, + nanos=publish_time_nanos) pull_response = test_utils.create_pull_response([ test_utils.PullResponseMessage( - data, attributes, publish_time_secs, publish_time_nanos, ack_id) + data, attributes, message_id, publish_time, + publish_time_secs, publish_time_nanos, ack_id) ]) expected_elements = [ TestWindowedValue( - PubsubMessage(data, attributes), + PubsubMessage(data, attributes, + message_id, publish_time), timestamp.Timestamp.from_rfc3339(attributes['time']), - [window.GlobalWindow()]), + [window.GlobalWindow()]) ] mock_pubsub.return_value.pull.return_value = pull_response @@ -480,13 +522,18 @@ def test_read_messages_timestamp_attribute_missing(self, mock_pubsub): publish_time_nanos = 234567000 publish_time = '2018-03-12T13:37:01.234567Z' ack_id = 'ack_id' + pubsub_publish_time = Timestamp(seconds=publish_time_secs, + nanos=publish_time_nanos) + message_id = '0123456789' pull_response = test_utils.create_pull_response([ test_utils.PullResponseMessage( - data, attributes, publish_time_secs, publish_time_nanos, ack_id) + data, attributes, message_id, pubsub_publish_time, + publish_time_secs, publish_time_nanos, ack_id) ]) expected_elements = [ TestWindowedValue( - PubsubMessage(data, attributes), + PubsubMessage(data, attributes, + message_id, pubsub_publish_time), timestamp.Timestamp.from_rfc3339(publish_time), [window.GlobalWindow()]), ] @@ -513,9 +560,13 @@ def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub): publish_time_secs = 1520861821 publish_time_nanos = 234567000 ack_id = 'ack_id' + message_id = '0123456789' + publish_time = Timestamp(seconds=publish_time_secs, + nanos=publish_time_nanos) pull_response = test_utils.create_pull_response([ test_utils.PullResponseMessage( - data, attributes, publish_time_secs, publish_time_nanos, ack_id) + data, attributes, message_id, publish_time, + publish_time_secs, publish_time_nanos, ack_id) ]) mock_pubsub.return_value.pull.return_value = pull_response diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py index cb9fbb99ff64..eb2915725e91 100644 --- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py @@ -34,11 +34,15 @@ # Protect against environments where pubsub library is not available. try: from google.cloud import pubsub + from google.protobuf.timestamp_pb2 import Timestamp except ImportError: pubsub = None + Timestamp = None @unittest.skipIf(pubsub is None, 'PubSub dependencies are not installed.') +@unittest.skipIf(Timestamp is None, + 'Google Protobuf dependencies are not installed.') @mock.patch('time.sleep', return_value=None) @mock.patch('google.cloud.pubsub.SubscriberClient') class PubSubMatcherTest(unittest.TestCase): @@ -71,10 +75,14 @@ def test_message_matcher_success(self, mock_get_sub, unsued_mock): def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True) - self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})] + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'}, + '0123456789', + Timestamp())] mock_sub = mock_get_sub.return_value mock_sub.pull.side_effect = [ - create_pull_response([PullResponseMessage(b'a', {'k': 'v'})]) + create_pull_response([PullResponseMessage(b'a', {'k': 'v'}, + '0123456789', + Timestamp())]) ] hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) @@ -82,11 +90,15 @@ def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock): def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True) - self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {})] + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {}, + '0123456789', + Timestamp())] mock_sub = mock_get_sub.return_value # Unexpected attribute 'k'. mock_sub.pull.side_effect = [ - create_pull_response([PullResponseMessage(b'a', {'k': 'v'})]) + create_pull_response([PullResponseMessage(b'a', {'k': 'v'}, + '0123456789', + Timestamp())]) ] with self.assertRaisesRegexp(AssertionError, r'Unexpected'): hc_assert_that(self.mock_presult, self.pubsub_matcher) @@ -96,10 +108,13 @@ def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock): def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True, strip_attributes=['id', 'timestamp']) - self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})] + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'}, + '0123456789', + Timestamp())] mock_sub = mock_get_sub.return_value mock_sub.pull.side_effect = [create_pull_response([ - PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'}) + PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'}, + '0123456789', Timestamp()) ])] hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) @@ -108,11 +123,14 @@ def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock): def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock): self.init_matcher(with_attributes=True, strip_attributes=['id', 'timestamp']) - self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})] + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'}, + '0123456789', + Timestamp())] mock_sub = mock_get_sub.return_value # Message is missing attribute 'timestamp'. mock_sub.pull.side_effect = [create_pull_response([ - PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'}) + PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'}, + '0123456789', Timestamp()) ])] with self.assertRaisesRegexp(AssertionError, r'Stripped attributes'): hc_assert_that(self.mock_presult, self.pubsub_matcher) @@ -124,8 +142,10 @@ def test_message_matcher_mismatch(self, mock_get_sub, unused_mock): self.pubsub_matcher.expected_msg = [b'a'] mock_sub = mock_get_sub.return_value mock_sub.pull.side_effect = [ - create_pull_response([PullResponseMessage(b'c', {}), - PullResponseMessage(b'd', {})]), + create_pull_response([PullResponseMessage(b'c', {}, '01', + Timestamp()), + PullResponseMessage(b'd', {}, '02', + Timestamp())]), ] with self.assertRaises(AssertionError) as error: hc_assert_that(self.mock_presult, self.pubsub_matcher) diff --git a/sdks/python/apache_beam/io/gcp/tests/utils_test.py b/sdks/python/apache_beam/io/gcp/tests/utils_test.py index c9e96d127127..d688ed615388 100644 --- a/sdks/python/apache_beam/io/gcp/tests/utils_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/utils_test.py @@ -33,10 +33,12 @@ from google.api_core import exceptions as gexc from google.cloud import bigquery from google.cloud import pubsub + from google.protobuf.timestamp_pb2 import Timestamp except ImportError: gexc = None bigquery = None pubsub = None + Timestamp = None @unittest.skipIf(bigquery is None, 'Bigquery dependencies are not installed.') @@ -162,9 +164,18 @@ def test_read_from_pubsub_with_attributes(self): data = b'data' ack_id = 'ack_id' attributes = {'key': 'value'} - message = PubsubMessage(data, attributes) + message_id = '0123456789' + publish_seconds = 1520861821 + publish_nanos = 234567000 + publish_time = Timestamp(seconds=publish_seconds, nanos=publish_nanos) + message = PubsubMessage(data, attributes, message_id, publish_time) pull_response = test_utils.create_pull_response( - [test_utils.PullResponseMessage(data, attributes, ack_id=ack_id)]) + [test_utils.PullResponseMessage(data, attributes, + message_id=message_id, + publish_time=publish_time, + publish_time_secs=publish_seconds, + publish_time_nanos=publish_nanos, + ack_id=ack_id)]) mock_pubsub.pull.return_value = pull_response output = utils.read_from_pubsub( mock_pubsub, @@ -213,16 +224,30 @@ def test_read_from_pubsub_many(self): 'data {}'.format(i).encode("utf-8") for i in range(number_of_elements) ] attributes_list = [{ - 'key': 'value {}'.format(i) - } for i in range(number_of_elements)] + 'key': 'value {}'.format(i)} for i in range(number_of_elements)] + message_id_list = ['0123456789_{}'.format(i) + for i in range(number_of_elements)] + publish_time_secs = 1520861821 + publish_time_nanos = 234567000 + publish_time_list = [Timestamp(seconds=publish_time_secs, + nanos=publish_time_nanos) + for i in range(number_of_elements)] ack_ids = ['ack_id_{}'.format(i) for i in range(number_of_elements)] messages = [ - PubsubMessage(data, attributes) - for data, attributes in zip(data_list, attributes_list) + PubsubMessage(data, attributes, message_id, publish_time) + for data, attributes, message_id, publish_time + in zip(data_list, attributes_list, + message_id_list, publish_time_list) ] response_messages = [ - test_utils.PullResponseMessage(data, attributes, ack_id=ack_id) - for data, attributes, ack_id in zip(data_list, attributes_list, ack_ids) + test_utils.PullResponseMessage( + data, attributes, message_id, + publish_time_secs=publish_time.seconds, + publish_time_nanos=publish_time.nanos, + ack_id=ack_id) + for data, attributes, message_id, publish_time, ack_id + in zip(data_list, attributes_list, message_id_list, + publish_time_list, ack_ids) ] class SequentialPullResponse(object): diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py index f9aa128d0585..a407c789e54f 100644 --- a/sdks/python/apache_beam/testing/test_utils.py +++ b/sdks/python/apache_beam/testing/test_utils.py @@ -154,10 +154,13 @@ class PullResponseMessage(object): Utility class for ``create_pull_response``. """ - def __init__(self, data, attributes=None, - publish_time_secs=None, publish_time_nanos=None, ack_id=None): + def __init__(self, data, attributes=None, message_id=None, + publish_time=None, publish_time_secs=None, + publish_time_nanos=None, ack_id=None): self.data = data self.attributes = attributes + self.message_id = message_id + self.publish_time = publish_time self.publish_time_secs = publish_time_secs self.publish_time_nanos = publish_time_nanos self.ack_id = ack_id @@ -191,6 +194,9 @@ def create_pull_response(responses): if response.publish_time_nanos is not None: message.publish_time.nanos = response.publish_time_nanos + if response.message_id is not None: + message.message_id = response.message_id + if response.ack_id is not None: received_message.ack_id = response.ack_id diff --git a/sdks/python/scripts/run_pylint.sh b/sdks/python/scripts/run_pylint.sh index 8a4b5e1061ae..a3f20d681958 100755 --- a/sdks/python/scripts/run_pylint.sh +++ b/sdks/python/scripts/run_pylint.sh @@ -104,6 +104,7 @@ ISORT_EXCLUDED=( "model.py" "taxi.py" "process_tfma.py" + "pubsub_test.py" ) SKIP_PARAM="" for file in "${ISORT_EXCLUDED[@]}"; do