diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index b0f8bdf03590..e7d82cf4776c 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -69,27 +69,43 @@ class PubsubMessage(object): attributes: (dict) Key-value map of str to str, containing both user-defined and service generated attributes (such as id_label and timestamp_attribute). May be None. + + message_id: (string) Message_Id as generated by PubSub + publish_time: (timestamp) Published time of message as generated by PubSub + as per google.protobuf.timestamp_pb2.Timestamp + + TODO(BEAM-7819): message_id and publish_time are not populated + on Dataflow runner """ - def __init__(self, data, attributes): + def __init__(self, data, attributes, message_id=None, publish_time=None): if data is None and not attributes: raise ValueError( 'Either data (%r) or attributes (%r) must be set.', data, attributes) self.data = data self.attributes = attributes + self.message_id = message_id + self.publish_time = publish_time def __hash__(self): + if self.publish_time is not None: + return hash((self.data, frozenset(self.attributes.items()), + self.message_id, self.publish_time.seconds, + self.publish_time.nanos)) + return hash((self.data, frozenset(self.attributes.items()))) def __eq__(self, other): return isinstance(other, PubsubMessage) and ( - self.data == other.data and self.attributes == other.attributes) + self.attributes == other.attributes and + self.message_id == other.message_id and + self.publish_time == other.publish_time) def __ne__(self, other): # TODO(BEAM-5949): Needed for Python 2 compatibility. return not self == other def __repr__(self): - return 'PubsubMessage(%s, %s)' % (self.data, self.attributes) + return 'PubsubMessage(%s, %s, %s, %s)' % (self.data, self.attributes, self.message_id, self.publish_time) @staticmethod def _from_proto_str(proto_msg): @@ -108,7 +124,7 @@ def _from_proto_str(proto_msg): msg.ParseFromString(proto_msg) # Convert ScalarMapContainer to dict. attributes = dict((key, msg.attributes[key]) for key in msg.attributes) - return PubsubMessage(msg.data, attributes) + return PubsubMessage(msg.data, attributes, msg.message_id, msg.publish_time) def _to_proto_str(self): """Get serialized form of ``PubsubMessage``. @@ -125,6 +141,8 @@ def _to_proto_str(self): msg.data = self.data for key, value in iteritems(self.attributes): msg.attributes[key] = value + msg.publish_time = self.publish_time + msg.message_id = self.message_id return msg.SerializeToString() @staticmethod @@ -137,7 +155,7 @@ def _from_message(msg): """ # Convert ScalarMapContainer to dict. attributes = dict((key, msg.attributes[key]) for key in msg.attributes) - return PubsubMessage(msg.data, attributes) + return PubsubMessage(msg.data, attributes, msg.message_id, msg.publish_time) class ReadFromPubSub(PTransform): diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 703b40f309ef..63d971e0309d 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -674,4 +674,4 @@ def test_write_messages_unsupported_features(self, mock_pubsub): if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) - unittest.main() + unittest.main() \ No newline at end of file diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py index 747f00b5f2cb..2f738e58ee3f 100644 --- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py @@ -17,16 +17,12 @@ """Unit test for PubSub verifier.""" -# pytype: skip-file - from __future__ import absolute_import import logging import sys import unittest -# patches unittest.TestCase to be python3 compatible -import future.tests.base # pylint: disable=unused-import import mock from hamcrest import assert_that as hc_assert_that @@ -35,16 +31,22 @@ from apache_beam.testing.test_utils import PullResponseMessage from apache_beam.testing.test_utils import create_pull_response +# Protect against environments where pubsub library is not available. try: from google.cloud import pubsub + from google.protobuf.timestamp_pb2 import Timestamp except ImportError: pubsub = None + Timestamp = None @unittest.skipIf(pubsub is None, 'PubSub dependencies are not installed.') +@unittest.skipIf(Timestamp is None, + 'Google Protobuf dependencies are not installed.') @mock.patch('time.sleep', return_value=None) @mock.patch('google.cloud.pubsub.SubscriberClient') class PubSubMatcherTest(unittest.TestCase): + @classmethod def setUpClass(cls): # Method has been renamed in Python 3 @@ -54,21 +56,14 @@ def setUpClass(cls): def setUp(self): self.mock_presult = mock.MagicMock() - def init_matcher( - self, expected_msg=None, with_attributes=False, strip_attributes=None): - self.pubsub_matcher = PubSubMessageMatcher( - 'mock_project', - 'mock_sub_name', - expected_msg, - with_attributes=with_attributes, - strip_attributes=strip_attributes) - - def init_counter_matcher(self, expected_msg_len=1): + def init_matcher(self, with_attributes=False, strip_attributes=None): self.pubsub_matcher = PubSubMessageMatcher( - 'mock_project', 'mock_sub_name', expected_msg_len=expected_msg_len) + 'mock_project', 'mock_sub_name', ['mock_expected_msg'], + with_attributes=with_attributes, strip_attributes=strip_attributes) def test_message_matcher_success(self, mock_get_sub, unsued_mock): - self.init_matcher(expected_msg=[b'a', b'b']) + self.init_matcher() + self.pubsub_matcher.expected_msg = [b'a', b'b'] mock_sub = mock_get_sub.return_value mock_sub.pull.side_effect = [ create_pull_response([PullResponseMessage(b'a', {})]), @@ -79,127 +74,100 @@ def test_message_matcher_success(self, mock_get_sub, unsued_mock): self.assertEqual(mock_sub.acknowledge.call_count, 2) def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock): - self.init_matcher( - expected_msg=[PubsubMessage(b'a', {'k': 'v'})], with_attributes=True) + self.init_matcher(with_attributes=True) + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'}, + '0123456789', + Timestamp())] mock_sub = mock_get_sub.return_value mock_sub.pull.side_effect = [ - create_pull_response([PullResponseMessage(b'a', {'k': 'v'})]) + create_pull_response([PullResponseMessage(b'a', {'k': 'v'}, + '0123456789', + Timestamp())]) ] hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) self.assertEqual(mock_sub.acknowledge.call_count, 1) def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock): - self.init_matcher( - expected_msg=[PubsubMessage(b'a', {})], with_attributes=True) + self.init_matcher(with_attributes=True) + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {}, + '0123456789', + Timestamp())] mock_sub = mock_get_sub.return_value # Unexpected attribute 'k'. mock_sub.pull.side_effect = [ - create_pull_response([PullResponseMessage(b'a', {'k': 'v'})]) + create_pull_response([PullResponseMessage(b'a', {'k': 'v'}, + '0123456789', + Timestamp())]) ] - with self.assertRaisesRegex(AssertionError, r'Unexpected'): + with self.assertRaisesRegexp(AssertionError, r'Unexpected'): hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) self.assertEqual(mock_sub.acknowledge.call_count, 1) def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock): - self.init_matcher( - expected_msg=[PubsubMessage(b'a', {'k': 'v'})], - with_attributes=True, - strip_attributes=['id', 'timestamp']) + self.init_matcher(with_attributes=True, + strip_attributes=['id', 'timestamp']) + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'}, + '0123456789', + Timestamp())] mock_sub = mock_get_sub.return_value - mock_sub.pull.side_effect = [ - create_pull_response([ - PullResponseMessage( - b'a', { - 'id': 'foo', 'timestamp': 'bar', 'k': 'v' - }) - ]) - ] + mock_sub.pull.side_effect = [create_pull_response([ + PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'}, + '0123456789', Timestamp()) + ])] hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) self.assertEqual(mock_sub.acknowledge.call_count, 1) def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock): - self.init_matcher( - expected_msg=[PubsubMessage(b'a', {'k': 'v'})], - with_attributes=True, - strip_attributes=['id', 'timestamp']) + self.init_matcher(with_attributes=True, + strip_attributes=['id', 'timestamp']) + self.pubsub_matcher.expected_msg = [PubsubMessage(b'a', {'k': 'v'}, + '0123456789', + Timestamp())] mock_sub = mock_get_sub.return_value # Message is missing attribute 'timestamp'. - mock_sub.pull.side_effect = [ - create_pull_response( - [PullResponseMessage(b'a', { - 'id': 'foo', 'k': 'v' - })]) - ] - with self.assertRaisesRegex(AssertionError, r'Stripped attributes'): + mock_sub.pull.side_effect = [create_pull_response([ + PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'}, + '0123456789', Timestamp()) + ])] + with self.assertRaisesRegexp(AssertionError, r'Stripped attributes'): hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) self.assertEqual(mock_sub.acknowledge.call_count, 1) def test_message_matcher_mismatch(self, mock_get_sub, unused_mock): - self.init_matcher(expected_msg=[b'a']) + self.init_matcher() + self.pubsub_matcher.expected_msg = [b'a'] mock_sub = mock_get_sub.return_value mock_sub.pull.side_effect = [ - create_pull_response( - [PullResponseMessage(b'c', {}), PullResponseMessage(b'd', {})]), + create_pull_response([PullResponseMessage(b'c', {}, '01', + Timestamp()), + PullResponseMessage(b'd', {}, '02', + Timestamp())]), ] with self.assertRaises(AssertionError) as error: hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertEqual(mock_sub.pull.call_count, 1) self.assertCountEqual([b'c', b'd'], self.pubsub_matcher.messages) - self.assertIn( - '\nExpected: Expected 1 messages.\n but: Got 2 messages.', - str(error.exception.args[0])) + self.assertTrue( + '\nExpected: Expected 1 messages.\n but: Got 2 messages.' + in str(error.exception.args[0])) self.assertEqual(mock_sub.pull.call_count, 1) self.assertEqual(mock_sub.acknowledge.call_count, 1) def test_message_matcher_timeout(self, mock_get_sub, unused_mock): - self.init_matcher(expected_msg=[b'a']) + self.init_matcher() mock_sub = mock_get_sub.return_value mock_sub.return_value.full_name.return_value = 'mock_sub' self.pubsub_matcher.timeout = 0.1 - with self.assertRaisesRegex(AssertionError, r'Expected 1.*\n.*Got 0'): - hc_assert_that(self.mock_presult, self.pubsub_matcher) - self.assertTrue(mock_sub.pull.called) - self.assertEqual(mock_sub.acknowledge.call_count, 0) - - def test_message_count_matcher_below_fail(self, mock_get_sub, unused_mock): - self.init_counter_matcher(expected_msg_len=1) - mock_sub = mock_get_sub.return_value - mock_sub.pull.side_effect = [ - create_pull_response( - [PullResponseMessage(b'c', {}), PullResponseMessage(b'd', {})]), - ] - with self.assertRaises(AssertionError) as error: - hc_assert_that(self.mock_presult, self.pubsub_matcher) - self.assertEqual(mock_sub.pull.call_count, 1) - self.assertIn( - '\nExpected: Expected 1 messages.\n but: Got 2 messages.', - str(error.exception.args[0])) - - def test_message_count_matcher_above_fail(self, mock_get_sub, unused_mock): - self.init_counter_matcher(expected_msg_len=1) - mock_sub = mock_get_sub.return_value - self.pubsub_matcher.timeout = 0.1 - with self.assertRaisesRegex(AssertionError, r'Expected 1.*\n.*Got 0'): + with self.assertRaisesRegexp(AssertionError, r'Expected 1.*\n.*Got 0'): hc_assert_that(self.mock_presult, self.pubsub_matcher) self.assertTrue(mock_sub.pull.called) self.assertEqual(mock_sub.acknowledge.call_count, 0) - def test_message_count_matcher_success(self, mock_get_sub, unused_mock): - self.init_counter_matcher(expected_msg_len=15) - mock_sub = mock_get_sub.return_value - mock_sub.pull.side_effect = [ - create_pull_response( - [PullResponseMessage(b'a', {'foo': 'bar'}) for _ in range(15)]) - ] - hc_assert_that(self.mock_presult, self.pubsub_matcher) - self.assertEqual(mock_sub.pull.call_count, 1) - self.assertEqual(mock_sub.acknowledge.call_count, 1) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) - unittest.main() + unittest.main() \ No newline at end of file diff --git a/sdks/python/apache_beam/io/gcp/tests/utils_test.py b/sdks/python/apache_beam/io/gcp/tests/utils_test.py index 901f33a75225..a002be847b2f 100644 --- a/sdks/python/apache_beam/io/gcp/tests/utils_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/utils_test.py @@ -17,15 +17,11 @@ """Unittest for GCP testing utils.""" -# pytype: skip-file - from __future__ import absolute_import import logging import unittest -# patches unittest.TestCase to be python3 compatible -import future.tests.base # pylint: disable=unused-import import mock from apache_beam.io.gcp.pubsub import PubsubMessage @@ -37,15 +33,18 @@ from google.api_core import exceptions as gexc from google.cloud import bigquery from google.cloud import pubsub + from google.protobuf.timestamp_pb2 import Timestamp except ImportError: gexc = None bigquery = None pubsub = None + Timestamp = None @unittest.skipIf(bigquery is None, 'Bigquery dependencies are not installed.') @mock.patch.object(bigquery, 'Client') class UtilsTest(unittest.TestCase): + def setUp(self): test_utils.patch_retry(self, utils) @@ -66,7 +65,9 @@ def test_delete_table_succeeds(self, mock_client): mock_client.return_value.dataset.return_value.table.return_value = ( 'table_ref') - utils.delete_bq_table('unused_project', 'unused_dataset', 'unused_table') + utils.delete_bq_table('unused_project', + 'unused_dataset', + 'unused_table') mock_client.return_value.delete_table.assert_called_with('table_ref') def test_delete_table_fails_not_found(self, mock_client): @@ -74,19 +75,23 @@ def test_delete_table_fails_not_found(self, mock_client): 'table_ref') mock_client.return_value.delete_table.side_effect = gexc.NotFound('test') - with self.assertRaisesRegex(Exception, r'does not exist:.*table_ref'): - utils.delete_bq_table('unused_project', 'unused_dataset', 'unused_table') + with self.assertRaisesRegexp(Exception, r'does not exist:.*table_ref'): + utils.delete_bq_table('unused_project', + 'unused_dataset', + 'unused_table') @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') class PubSubUtilTest(unittest.TestCase): + def test_write_to_pubsub(self): mock_pubsub = mock.Mock() topic_path = "project/fakeproj/topics/faketopic" data = b'data' utils.write_to_pubsub(mock_pubsub, topic_path, [data]) mock_pubsub.publish.assert_has_calls( - [mock.call(topic_path, data), mock.call().result()]) + [mock.call(topic_path, data), + mock.call().result()]) def test_write_to_pubsub_with_attributes(self): mock_pubsub = mock.Mock() @@ -97,7 +102,8 @@ def test_write_to_pubsub_with_attributes(self): utils.write_to_pubsub( mock_pubsub, topic_path, [message], with_attributes=True) mock_pubsub.publish.assert_has_calls( - [mock.call(topic_path, data, **attributes), mock.call().result()]) + [mock.call(topic_path, data, **attributes), + mock.call().result()]) def test_write_to_pubsub_delay(self): number_of_elements = 2 @@ -113,8 +119,8 @@ def test_write_to_pubsub_delay(self): delay_between_chunks=123) mock_time.sleep.assert_called_with(123) mock_pubsub.publish.assert_has_calls( - [mock.call(topic_path, data), mock.call().result()] * - number_of_elements) + [mock.call(topic_path, data), + mock.call().result()] * number_of_elements) def test_write_to_pubsub_many_chunks(self): number_of_elements = 83 @@ -158,9 +164,18 @@ def test_read_from_pubsub_with_attributes(self): data = b'data' ack_id = 'ack_id' attributes = {'key': 'value'} - message = PubsubMessage(data, attributes) + message_id = '0123456789' + publish_seconds = 1520861821 + publish_nanos = 234567000 + publish_time = Timestamp(seconds=publish_seconds, nanos=publish_nanos) + message = PubsubMessage(data, attributes, message_id, publish_time) pull_response = test_utils.create_pull_response( - [test_utils.PullResponseMessage(data, attributes, ack_id=ack_id)]) + [test_utils.PullResponseMessage(data, attributes, + message_id=message_id, + publish_time=publish_time, + publish_time_secs=publish_seconds, + publish_time_nanos=publish_nanos, + ack_id=ack_id)]) mock_pubsub.pull.return_value = pull_response output = utils.read_from_pubsub( mock_pubsub, @@ -180,6 +195,7 @@ def test_read_from_pubsub_flaky(self): [test_utils.PullResponseMessage(data, ack_id=ack_id)]) class FlakyPullResponse(object): + def __init__(self, pull_response): self.pull_response = pull_response self._state = -1 @@ -208,21 +224,34 @@ def test_read_from_pubsub_many(self): 'data {}'.format(i).encode("utf-8") for i in range(number_of_elements) ] attributes_list = [{ - 'key': 'value {}'.format(i) - } for i in range(number_of_elements)] + 'key': 'value {}'.format(i)} for i in range(number_of_elements)] + message_id_list = ['0123456789_{}'.format(i) + for i in range(number_of_elements)] + publish_time_secs = 1520861821 + publish_time_nanos = 234567000 + publish_time_list = [Timestamp(seconds=publish_time_secs, + nanos=publish_time_nanos) + for i in range(number_of_elements)] ack_ids = ['ack_id_{}'.format(i) for i in range(number_of_elements)] messages = [ - PubsubMessage(data, attributes) for data, - attributes in zip(data_list, attributes_list) + PubsubMessage(data, attributes, message_id, publish_time) + for data, attributes, message_id, publish_time + in zip(data_list, attributes_list, + message_id_list, publish_time_list) ] response_messages = [ - test_utils.PullResponseMessage(data, attributes, ack_id=ack_id) - for data, - attributes, - ack_id in zip(data_list, attributes_list, ack_ids) + test_utils.PullResponseMessage( + data, attributes, message_id, + publish_time_secs=publish_time.seconds, + publish_time_nanos=publish_time.nanos, + ack_id=ack_id) + for data, attributes, message_id, publish_time, ack_id + in zip(data_list, attributes_list, message_id_list, + publish_time_list, ack_ids) ] class SequentialPullResponse(object): + def __init__(self, response_messages, response_size): self.response_messages = response_messages self.response_size = response_size @@ -248,9 +277,9 @@ def __call__(self, *args, **kwargs): def test_read_from_pubsub_invalid_arg(self): sub_client = mock.Mock() subscription_path = "project/fakeproj/subscriptions/fakesub" - with self.assertRaisesRegex(ValueError, "number_of_elements"): + with self.assertRaisesRegexp(ValueError, "number_of_elements"): utils.read_from_pubsub(sub_client, subscription_path) - with self.assertRaisesRegex(ValueError, "number_of_elements"): + with self.assertRaisesRegexp(ValueError, "number_of_elements"): utils.read_from_pubsub( sub_client, subscription_path, with_attributes=True) @@ -264,4 +293,4 @@ def _assert_ack_ids_equal(self, mock_pubsub, ack_ids): if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) - unittest.main() + unittest.main() \ No newline at end of file diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py index 14bda883eb5d..b1f46a344ee8 100644 --- a/sdks/python/apache_beam/testing/test_utils.py +++ b/sdks/python/apache_beam/testing/test_utils.py @@ -20,8 +20,6 @@ For internal use only; no backwards-compatibility guarantees. """ -# pytype: skip-file - from __future__ import absolute_import import hashlib @@ -42,6 +40,7 @@ class TempDir(object): """Context Manager to create and clean-up a temporary directory.""" + def __init__(self): self._tempdir = tempfile.mkdtemp() @@ -66,9 +65,8 @@ def create_temp_file(self, suffix='', lines=None): Returns: The name of the temporary file created. """ - with tempfile.NamedTemporaryFile(delete=False, - dir=self._tempdir, - suffix=suffix) as f: + with tempfile.NamedTemporaryFile( + delete=False, dir=self._tempdir, suffix=suffix) as f: if lines: for line in lines: f.write(line) @@ -79,9 +77,8 @@ def create_temp_file(self, suffix='', lines=None): def compute_hash(content, hashing_alg=DEFAULT_HASHING_ALG): """Compute a hash value of a list of objects by hashing their string representations.""" - content = [ - str(x).encode('utf-8') if not isinstance(x, bytes) else x for x in content - ] + content = [str(x).encode('utf-8') if not isinstance(x, bytes) else x + for x in content] content.sort() m = hashlib.new(hashing_alg) for elem in content: @@ -106,15 +103,11 @@ def patch_retry(testcase, module): def patched_retry_with_exponential_backoff(num_retries, retry_filter): """A patch for retry decorator to use a mock dummy clock and logger.""" return real_retry_with_exponential_backoff( - num_retries=num_retries, - retry_filter=retry_filter, - logger=Mock(), + num_retries=num_retries, retry_filter=retry_filter, logger=Mock(), clock=Mock()) - patch.object( - retry, - 'with_exponential_backoff', - side_effect=patched_retry_with_exponential_backoff).start() + patch.object(retry, 'with_exponential_backoff', + side_effect=patched_retry_with_exponential_backoff).start() # Reload module after patching. imp.reload(module) @@ -128,7 +121,8 @@ def remove_patches(): @retry.with_exponential_backoff( - num_retries=3, retry_filter=retry.retry_on_beam_io_error_filter) + num_retries=3, + retry_filter=retry.retry_on_beam_io_error_filter) def delete_files(file_paths): """A function to clean up files or directories using ``FileSystems``. @@ -138,7 +132,8 @@ def delete_files(file_paths): file_paths: A list of strings contains file paths or directories. """ if len(file_paths) == 0: - raise RuntimeError('Clean up failed. Invalid file path: %s.' % file_paths) + raise RuntimeError('Clean up failed. Invalid file path: %s.' % + file_paths) FileSystems.delete(file_paths) @@ -159,15 +154,13 @@ class PullResponseMessage(object): Utility class for ``create_pull_response``. """ - def __init__( - self, - data, - attributes=None, - publish_time_secs=None, - publish_time_nanos=None, - ack_id=None): + def __init__(self, data, attributes=None, message_id=None, + publish_time=None, publish_time_secs=None, + publish_time_nanos=None, ack_id=None): self.data = data self.attributes = attributes + self.message_id = message_id + self.publish_time = publish_time self.publish_time_secs = publish_time_secs self.publish_time_nanos = publish_time_nanos self.ack_id = ack_id @@ -201,7 +194,10 @@ def create_pull_response(responses): if response.publish_time_nanos is not None: message.publish_time.nanos = response.publish_time_nanos + if response.message_id is not None: + message.message_id = response.message_id + if response.ack_id is not None: received_message.ack_id = response.ack_id - return res + return res \ No newline at end of file diff --git a/sdks/python/test-requirements.txt b/sdks/python/test-requirements.txt new file mode 100644 index 000000000000..399b3210dcfb --- /dev/null +++ b/sdks/python/test-requirements.txt @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +grpcio-tools==1.30.0 +future==0.18.2 +mypy-protobuf==1.18 +PyHamcrest==2.0.2 +mock==4.0.2 +dill==0.3.2 +python-dateutil==2.8.1 +pytz==2020.1 +fastavro==0.23.6 +numpy==1.19.0 +avro==1.10.0 +httplib2 +crcmod==1.7 +google-cloud-pubsub==1.0.2 \ No newline at end of file