Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
bc8f591
Parse message_id from PubSub message, amend test to add to attribute …
Jul 29, 2019
425a54a
amend matcher tests to expect message_id in attributes
Jul 31, 2019
45107e1
Merge branch 'master' into BEAM-7819-parse-pubsub-message_id
Aug 2, 2019
d675469
fix lint issues, too long lines
Aug 3, 2019
1e8713a
fix lint issues, trailing whitespaces and indentation. Remove check f…
Aug 3, 2019
0729b88
remove erroneous message attribute check
Aug 5, 2019
2928ac2
Add additional blank line to fix lint issue
Aug 8, 2019
56af1df
Merge branch 'master' into BEAM-7819-parse-pubsub-message_id
Aug 8, 2019
5ed7398
fix test_read_from_pubsub_with_attributes and test_read_from_pubsub_many
Aug 8, 2019
a0366a3
remove eroneous carriage return
Aug 8, 2019
8534cc3
move message_id and publish_time to pubsubmessage attributes
Aug 9, 2019
a9247eb
Pubsub message corrected
Aug 9, 2019
d6b4dea
Broken tests, directrunner ok
Aug 9, 2019
ef1a4e0
add message_id to pubsubmessage object
Aug 11, 2019
821101e
Amend to expect google.protobuf.timestamp_pb2.Timestamp for publish_t…
Aug 12, 2019
cbc8cdc
fix lint issues
Aug 12, 2019
86ba8c2
amend imports, add exclusion to isort for google grouping issues for …
Aug 12, 2019
32b8b8f
amend furter lint problems
Aug 12, 2019
085d71d
remove erroneous carriage returns
Aug 12, 2019
bdda808
Correctly initialise publish_time, amend matcher tests
Aug 12, 2019
a377d4f
fixing matcher and utils tests
Aug 12, 2019
516c9d0
fix test pubsub many test
Aug 13, 2019
b988939
fix lint issues
Aug 13, 2019
037e9cb
Try and sort out NoneType errors
Aug 13, 2019
979bbcf
Amend init for pubsubmessage
Aug 13, 2019
b327604
revert using protobuf.Timestamp within pubsubmessage
Aug 13, 2019
dfd92ba
add additional equality tests for message_id and publish_time; correc…
Aug 16, 2019
3b9788a
Move timestamp protobug import outside try/catch
Aug 17, 2019
0419852
Remove whitespace
Aug 17, 2019
4304abc
Import sorting to resolve isort errors
Aug 17, 2019
dc22643
Move Timestamp import back out try block
Aug 17, 2019
c75d894
Add pubsub_test to isort exclusion
Aug 17, 2019
f67dc00
Fix whitespace
Aug 17, 2019
0f6b5f9
Merge branch 'master' into BEAM-7819-parse-pubsub-message_id
Aug 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,43 @@ class PubsubMessage(object):
attributes: (dict) Key-value map of str to str, containing both user-defined
and service generated attributes (such as id_label and
timestamp_attribute). May be None.
message_id: (string) Message_Id as generated by PubSub
publish_time: (timestamp) Published time of message as generated by PubSub
as per google.protobuf.timestamp_pb2.Timestamp

TODO(BEAM-7819): message_id and publish_time are not populated
on Dataflow runner
"""

def __init__(self, data, attributes):
def __init__(self, data, attributes, message_id=None, publish_time=None):
if data is None and not attributes:
raise ValueError('Either data (%r) or attributes (%r) must be set.',
data, attributes)
self.data = data
self.attributes = attributes
self.message_id = message_id
self.publish_time = publish_time

def __hash__(self):
return hash((self.data, frozenset(self.attributes.items())))
return hash((self.data, frozenset(self.attributes.items()),
self.message_id, self.publish_time.seconds,
self.publish_time.nanos))

def __eq__(self, other):
return isinstance(other, PubsubMessage) and (
self.data == other.data and
self.attributes == other.attributes)
self.attributes == other.attributes and
self.message_id == other.message_id and
self.publish_time == other.publish_time)

def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other

def __repr__(self):
return 'PubsubMessage(%s, %s)' % (self.data, self.attributes)
return 'PubsubMessage(%s, %s, %s, %s)' % (self.data, self.attributes,
self.message_id,
self.publish_time)

@staticmethod
def _from_proto_str(proto_msg):
Expand All @@ -100,7 +114,8 @@ def _from_proto_str(proto_msg):
msg.ParseFromString(proto_msg)
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
return PubsubMessage(msg.data, attributes)

return PubsubMessage(msg.data, attributes, msg.message_id, msg.publish_time)

def _to_proto_str(self):
"""Get serialized form of ``PubsubMessage``.
Expand All @@ -127,7 +142,8 @@ def _from_message(msg):
"""
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
return PubsubMessage(msg.data, attributes)

return PubsubMessage(msg.data, attributes, msg.message_id, msg.publish_time)


class ReadFromPubSub(PTransform):
Expand Down Expand Up @@ -391,7 +407,6 @@ def __init__(self, topic, id_label, with_attributes, timestamp_attribute):
self.id_label = id_label
self.with_attributes = with_attributes
self.timestamp_attribute = timestamp_attribute

self.project, self.topic_name = parse_topic(topic)

@property
Expand Down
89 changes: 70 additions & 19 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.utils import timestamp

from google.protobuf.timestamp_pb2 import Timestamp
# Protect against environments where the PubSub library is not available.
try:
from google.cloud import pubsub
Expand Down Expand Up @@ -82,28 +83,55 @@ def test_proto_conversion(self):
self.assertEqual(m_converted.attributes, attributes)

def test_eq(self):
a = PubsubMessage(b'abc', {1: 2, 3: 4})
b = PubsubMessage(b'abc', {1: 2, 3: 4})
c = PubsubMessage(b'abc', {1: 2})
publish_time = Timestamp(seconds=1520861821,
nanos=234567000)
a = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
b = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
c = PubsubMessage(b'abc', {1: 2}, '1234567890', publish_time)
d = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234', publish_time)
e = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890',
Timestamp(seconds=0, nanos=0))
self.assertTrue(a == b)
self.assertTrue(a != c)
self.assertTrue(b != c)
self.assertTrue(a != d)
self.assertTrue(b != d)
self.assertTrue(a != e)
self.assertTrue(b != e)

def test_hash(self):
a = PubsubMessage(b'abc', {1: 2, 3: 4})
b = PubsubMessage(b'abc', {1: 2, 3: 4})
c = PubsubMessage(b'abc', {1: 2})
publish_time = Timestamp(seconds=1520861821,
nanos=234567000)
a = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
b = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
c = PubsubMessage(b'abc', {1: 2}, '1234567890', publish_time)
d = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234', publish_time)
e = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890',
Timestamp(seconds=0, nanos=0))
self.assertTrue(hash(a) == hash(b))
self.assertTrue(hash(a) != hash(c))
self.assertTrue(hash(b) != hash(c))
self.assertTrue(hash(a) != hash(d))
self.assertTrue(hash(b) != hash(d))
self.assertTrue(hash(a) != hash(e))
self.assertTrue(hash(b) != hash(e))

def test_repr(self):
a = PubsubMessage(b'abc', {1: 2, 3: 4})
b = PubsubMessage(b'abc', {1: 2, 3: 4})
c = PubsubMessage(b'abc', {1: 2})
publish_time = Timestamp(seconds=1520861821,
nanos=234567000)
a = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
b = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
c = PubsubMessage(b'abc', {1: 2}, '1234567890', publish_time)
d = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234', publish_time)
e = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890',
Timestamp(seconds=0, nanos=0))
self.assertTrue(repr(a) == repr(b))
self.assertTrue(repr(a) != repr(c))
self.assertTrue(repr(b) != repr(c))
self.assertTrue(repr(a) != repr(d))
self.assertTrue(repr(b) != repr(d))
self.assertTrue(repr(a) != repr(e))
self.assertTrue(repr(b) != repr(e))


@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
Expand Down Expand Up @@ -338,12 +366,17 @@ def test_read_messages_success(self, mock_pubsub):
publish_time_nanos = 234567000
attributes = {'key': 'value'}
ack_id = 'ack_id'
message_id = '0123456789'
publish_time = Timestamp(seconds=publish_time_secs,
nanos=publish_time_nanos)
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
data, attributes, message_id, publish_time,
publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(PubsubMessage(data, attributes),
TestWindowedValue(PubsubMessage(data, attributes,
message_id, publish_time),
timestamp.Timestamp(1520861821.234567),
[window.GlobalWindow()])]
mock_pubsub.return_value.pull.return_value = pull_response
Expand Down Expand Up @@ -413,13 +446,17 @@ def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
publish_time_secs = 1520861821
publish_time_nanos = 234567000
ack_id = 'ack_id'
message_id = '0123456789'
publish_time = Timestamp(seconds=publish_time_secs,
nanos=publish_time_nanos)
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
data, attributes, message_id, publish_time,
publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
PubsubMessage(data, attributes, message_id, publish_time),
timestamp.Timestamp(micros=int(attributes['time']) * 1000),
[window.GlobalWindow()]),
]
Expand All @@ -446,15 +483,20 @@ def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
publish_time_secs = 1337000000
publish_time_nanos = 133700000
ack_id = 'ack_id'
message_id = '0123456789'
publish_time = Timestamp(seconds=publish_time_secs,
nanos=publish_time_nanos)
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
data, attributes, message_id, publish_time,
publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
PubsubMessage(data, attributes,
message_id, publish_time),
timestamp.Timestamp.from_rfc3339(attributes['time']),
[window.GlobalWindow()]),
[window.GlobalWindow()])
]
mock_pubsub.return_value.pull.return_value = pull_response

Expand All @@ -480,13 +522,18 @@ def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
publish_time_nanos = 234567000
publish_time = '2018-03-12T13:37:01.234567Z'
ack_id = 'ack_id'
pubsub_publish_time = Timestamp(seconds=publish_time_secs,
nanos=publish_time_nanos)
message_id = '0123456789'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
data, attributes, message_id, pubsub_publish_time,
publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
PubsubMessage(data, attributes,
message_id, pubsub_publish_time),
timestamp.Timestamp.from_rfc3339(publish_time),
[window.GlobalWindow()]),
]
Expand All @@ -513,9 +560,13 @@ def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
publish_time_secs = 1520861821
publish_time_nanos = 234567000
ack_id = 'ack_id'
message_id = '0123456789'
publish_time = Timestamp(seconds=publish_time_secs,
nanos=publish_time_nanos)
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
data, attributes, message_id, publish_time,
publish_time_secs, publish_time_nanos, ack_id)
])
mock_pubsub.return_value.pull.return_value = pull_response

Expand Down
40 changes: 30 additions & 10 deletions sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@
# Protect against environments where pubsub library is not available.
try:
from google.cloud import pubsub
from google.protobuf.timestamp_pb2 import Timestamp
except ImportError:
pubsub = None
Timestamp = None


@unittest.skipIf(pubsub is None, 'PubSub dependencies are not installed.')
@unittest.skipIf(Timestamp is None,
'Google Protobuf dependencies are not installed.')
@mock.patch('time.sleep', return_value=None)
@mock.patch('google.cloud.pubsub.SubscriberClient')
class PubSubMatcherTest(unittest.TestCase):
Expand Down Expand Up @@ -71,22 +75,30 @@ def test_message_matcher_success(self, mock_get_sub, unsued_mock):

def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True)
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'},
'0123456789',
Timestamp())]
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
create_pull_response([PullResponseMessage(b'a', {'k': 'v'},
'0123456789',
Timestamp())])
]
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 1)
self.assertEqual(mock_sub.acknowledge.call_count, 1)

def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True)
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {})]
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {},
'0123456789',
Timestamp())]
mock_sub = mock_get_sub.return_value
# Unexpected attribute 'k'.
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
create_pull_response([PullResponseMessage(b'a', {'k': 'v'},
'0123456789',
Timestamp())])
]
with self.assertRaisesRegexp(AssertionError, r'Unexpected'):
hc_assert_that(self.mock_presult, self.pubsub_matcher)
Expand All @@ -96,10 +108,13 @@ def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True,
strip_attributes=['id', 'timestamp'])
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'},
'0123456789',
Timestamp())]
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [create_pull_response([
PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'})
PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'},
'0123456789', Timestamp())
])]
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 1)
Expand All @@ -108,11 +123,14 @@ def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True,
strip_attributes=['id', 'timestamp'])
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'},
'0123456789',
Timestamp())]
mock_sub = mock_get_sub.return_value
# Message is missing attribute 'timestamp'.
mock_sub.pull.side_effect = [create_pull_response([
PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'})
PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'},
'0123456789', Timestamp())
])]
with self.assertRaisesRegexp(AssertionError, r'Stripped attributes'):
hc_assert_that(self.mock_presult, self.pubsub_matcher)
Expand All @@ -124,8 +142,10 @@ def test_message_matcher_mismatch(self, mock_get_sub, unused_mock):
self.pubsub_matcher.expected_msg = [b'a']
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'c', {}),
PullResponseMessage(b'd', {})]),
create_pull_response([PullResponseMessage(b'c', {}, '01',
Timestamp()),
PullResponseMessage(b'd', {}, '02',
Timestamp())]),
]
with self.assertRaises(AssertionError) as error:
hc_assert_that(self.mock_presult, self.pubsub_matcher)
Expand Down
Loading