Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 7 additions & 22 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,43 +61,29 @@ class PubsubMessage(object):
attributes: (dict) Key-value map of str to str, containing both user-defined
and service generated attributes (such as id_label and
timestamp_attribute). May be None.
message_id: (string) Message_Id as generated by PubSub
publish_time: (timestamp) Published time of message as generated by PubSub
as per google.protobuf.timestamp_pb2.Timestamp

TODO(BEAM-7819): message_id and publish_time are not populated
on Dataflow runner
"""

def __init__(self, data, attributes, message_id=None, publish_time=None):
def __init__(self, data, attributes):
if data is None and not attributes:
raise ValueError('Either data (%r) or attributes (%r) must be set.',
data, attributes)
self.data = data
self.attributes = attributes
self.message_id = message_id
self.publish_time = publish_time

def __hash__(self):
return hash((self.data, frozenset(self.attributes.items()),
self.message_id, self.publish_time.seconds,
self.publish_time.nanos))
return hash((self.data, frozenset(self.attributes.items())))

def __eq__(self, other):
return isinstance(other, PubsubMessage) and (
self.data == other.data and
self.attributes == other.attributes and
self.message_id == other.message_id and
self.publish_time == other.publish_time)
self.attributes == other.attributes)

def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other

def __repr__(self):
return 'PubsubMessage(%s, %s, %s, %s)' % (self.data, self.attributes,
self.message_id,
self.publish_time)
return 'PubsubMessage(%s, %s)' % (self.data, self.attributes)

@staticmethod
def _from_proto_str(proto_msg):
Expand All @@ -114,8 +100,7 @@ def _from_proto_str(proto_msg):
msg.ParseFromString(proto_msg)
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)

return PubsubMessage(msg.data, attributes, msg.message_id, msg.publish_time)
return PubsubMessage(msg.data, attributes)

def _to_proto_str(self):
"""Get serialized form of ``PubsubMessage``.
Expand All @@ -142,8 +127,7 @@ def _from_message(msg):
"""
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)

return PubsubMessage(msg.data, attributes, msg.message_id, msg.publish_time)
return PubsubMessage(msg.data, attributes)


class ReadFromPubSub(PTransform):
Expand Down Expand Up @@ -407,6 +391,7 @@ def __init__(self, topic, id_label, with_attributes, timestamp_attribute):
self.id_label = id_label
self.with_attributes = with_attributes
self.timestamp_attribute = timestamp_attribute

self.project, self.topic_name = parse_topic(topic)

@property
Expand Down
89 changes: 19 additions & 70 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.utils import timestamp

from google.protobuf.timestamp_pb2 import Timestamp
# Protect against environments where the PubSub library is not available.
try:
from google.cloud import pubsub
Expand Down Expand Up @@ -83,55 +82,28 @@ def test_proto_conversion(self):
self.assertEqual(m_converted.attributes, attributes)

def test_eq(self):
publish_time = Timestamp(seconds=1520861821,
nanos=234567000)
a = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
b = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
c = PubsubMessage(b'abc', {1: 2}, '1234567890', publish_time)
d = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234', publish_time)
e = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890',
Timestamp(seconds=0, nanos=0))
a = PubsubMessage(b'abc', {1: 2, 3: 4})
b = PubsubMessage(b'abc', {1: 2, 3: 4})
c = PubsubMessage(b'abc', {1: 2})
self.assertTrue(a == b)
self.assertTrue(a != c)
self.assertTrue(b != c)
self.assertTrue(a != d)
self.assertTrue(b != d)
self.assertTrue(a != e)
self.assertTrue(b != e)

def test_hash(self):
publish_time = Timestamp(seconds=1520861821,
nanos=234567000)
a = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
b = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
c = PubsubMessage(b'abc', {1: 2}, '1234567890', publish_time)
d = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234', publish_time)
e = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890',
Timestamp(seconds=0, nanos=0))
a = PubsubMessage(b'abc', {1: 2, 3: 4})
b = PubsubMessage(b'abc', {1: 2, 3: 4})
c = PubsubMessage(b'abc', {1: 2})
self.assertTrue(hash(a) == hash(b))
self.assertTrue(hash(a) != hash(c))
self.assertTrue(hash(b) != hash(c))
self.assertTrue(hash(a) != hash(d))
self.assertTrue(hash(b) != hash(d))
self.assertTrue(hash(a) != hash(e))
self.assertTrue(hash(b) != hash(e))

def test_repr(self):
publish_time = Timestamp(seconds=1520861821,
nanos=234567000)
a = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
b = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890', publish_time)
c = PubsubMessage(b'abc', {1: 2}, '1234567890', publish_time)
d = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234', publish_time)
e = PubsubMessage(b'abc', {1: 2, 3: 4}, '1234567890',
Timestamp(seconds=0, nanos=0))
a = PubsubMessage(b'abc', {1: 2, 3: 4})
b = PubsubMessage(b'abc', {1: 2, 3: 4})
c = PubsubMessage(b'abc', {1: 2})
self.assertTrue(repr(a) == repr(b))
self.assertTrue(repr(a) != repr(c))
self.assertTrue(repr(b) != repr(c))
self.assertTrue(repr(a) != repr(d))
self.assertTrue(repr(b) != repr(d))
self.assertTrue(repr(a) != repr(e))
self.assertTrue(repr(b) != repr(e))


@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
Expand Down Expand Up @@ -366,17 +338,12 @@ def test_read_messages_success(self, mock_pubsub):
publish_time_nanos = 234567000
attributes = {'key': 'value'}
ack_id = 'ack_id'
message_id = '0123456789'
publish_time = Timestamp(seconds=publish_time_secs,
nanos=publish_time_nanos)
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, message_id, publish_time,
publish_time_secs, publish_time_nanos, ack_id)
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(PubsubMessage(data, attributes,
message_id, publish_time),
TestWindowedValue(PubsubMessage(data, attributes),
timestamp.Timestamp(1520861821.234567),
[window.GlobalWindow()])]
mock_pubsub.return_value.pull.return_value = pull_response
Expand Down Expand Up @@ -446,17 +413,13 @@ def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
publish_time_secs = 1520861821
publish_time_nanos = 234567000
ack_id = 'ack_id'
message_id = '0123456789'
publish_time = Timestamp(seconds=publish_time_secs,
nanos=publish_time_nanos)
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, message_id, publish_time,
publish_time_secs, publish_time_nanos, ack_id)
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes, message_id, publish_time),
PubsubMessage(data, attributes),
timestamp.Timestamp(micros=int(attributes['time']) * 1000),
[window.GlobalWindow()]),
]
Expand All @@ -483,20 +446,15 @@ def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
publish_time_secs = 1337000000
publish_time_nanos = 133700000
ack_id = 'ack_id'
message_id = '0123456789'
publish_time = Timestamp(seconds=publish_time_secs,
nanos=publish_time_nanos)
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, message_id, publish_time,
publish_time_secs, publish_time_nanos, ack_id)
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes,
message_id, publish_time),
PubsubMessage(data, attributes),
timestamp.Timestamp.from_rfc3339(attributes['time']),
[window.GlobalWindow()])
[window.GlobalWindow()]),
]
mock_pubsub.return_value.pull.return_value = pull_response

Expand All @@ -522,18 +480,13 @@ def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
publish_time_nanos = 234567000
publish_time = '2018-03-12T13:37:01.234567Z'
ack_id = 'ack_id'
pubsub_publish_time = Timestamp(seconds=publish_time_secs,
nanos=publish_time_nanos)
message_id = '0123456789'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, message_id, pubsub_publish_time,
publish_time_secs, publish_time_nanos, ack_id)
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes,
message_id, pubsub_publish_time),
PubsubMessage(data, attributes),
timestamp.Timestamp.from_rfc3339(publish_time),
[window.GlobalWindow()]),
]
Expand All @@ -560,13 +513,9 @@ def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
publish_time_secs = 1520861821
publish_time_nanos = 234567000
ack_id = 'ack_id'
message_id = '0123456789'
publish_time = Timestamp(seconds=publish_time_secs,
nanos=publish_time_nanos)
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, message_id, publish_time,
publish_time_secs, publish_time_nanos, ack_id)
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
mock_pubsub.return_value.pull.return_value = pull_response

Expand Down
40 changes: 10 additions & 30 deletions sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,11 @@
# Protect against environments where pubsub library is not available.
try:
from google.cloud import pubsub
from google.protobuf.timestamp_pb2 import Timestamp
except ImportError:
pubsub = None
Timestamp = None


@unittest.skipIf(pubsub is None, 'PubSub dependencies are not installed.')
@unittest.skipIf(Timestamp is None,
'Google Protobuf dependencies are not installed.')
@mock.patch('time.sleep', return_value=None)
@mock.patch('google.cloud.pubsub.SubscriberClient')
class PubSubMatcherTest(unittest.TestCase):
Expand Down Expand Up @@ -75,30 +71,22 @@ def test_message_matcher_success(self, mock_get_sub, unsued_mock):

def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True)
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'},
'0123456789',
Timestamp())]
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'a', {'k': 'v'},
'0123456789',
Timestamp())])
create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
]
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 1)
self.assertEqual(mock_sub.acknowledge.call_count, 1)

def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True)
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {},
'0123456789',
Timestamp())]
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {})]
mock_sub = mock_get_sub.return_value
# Unexpected attribute 'k'.
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'a', {'k': 'v'},
'0123456789',
Timestamp())])
create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
]
with self.assertRaisesRegexp(AssertionError, r'Unexpected'):
hc_assert_that(self.mock_presult, self.pubsub_matcher)
Expand All @@ -108,13 +96,10 @@ def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True,
strip_attributes=['id', 'timestamp'])
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'},
'0123456789',
Timestamp())]
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [create_pull_response([
PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'},
'0123456789', Timestamp())
PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'})
])]
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 1)
Expand All @@ -123,14 +108,11 @@ def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True,
strip_attributes=['id', 'timestamp'])
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'},
'0123456789',
Timestamp())]
self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'})]
mock_sub = mock_get_sub.return_value
# Message is missing attribute 'timestamp'.
mock_sub.pull.side_effect = [create_pull_response([
PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'},
'0123456789', Timestamp())
PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'})
])]
with self.assertRaisesRegexp(AssertionError, r'Stripped attributes'):
hc_assert_that(self.mock_presult, self.pubsub_matcher)
Expand All @@ -142,10 +124,8 @@ def test_message_matcher_mismatch(self, mock_get_sub, unused_mock):
self.pubsub_matcher.expected_msg = [b'a']
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [
create_pull_response([PullResponseMessage(b'c', {}, '01',
Timestamp()),
PullResponseMessage(b'd', {}, '02',
Timestamp())]),
create_pull_response([PullResponseMessage(b'c', {}),
PullResponseMessage(b'd', {})]),
]
with self.assertRaises(AssertionError) as error:
hc_assert_that(self.mock_presult, self.pubsub_matcher)
Expand Down
Loading