diff --git a/spanner/tests/unit/test_keyset.py b/spanner/tests/unit/test_keyset.py index 8ff68d81d3cd..0dc70d2f494e 100644 --- a/spanner/tests/unit/test_keyset.py +++ b/spanner/tests/unit/test_keyset.py @@ -16,6 +16,9 @@ import unittest +import six + + class TestKeyRange(unittest.TestCase): def _getTargetClass(self): @@ -26,6 +29,15 @@ def _getTargetClass(self): def _make_one(self, *args, **kwargs): return self._getTargetClass()(*args, **kwargs) + def _check_unused_keys(self, krange, used_keys): + key_types = ('start_closed', + 'start_open', + 'end_closed', + 'end_open') + for key_type in key_types: + if key_type not in used_keys: + self.assertFalse(krange.to_pb().HasField(key_type)) + def test_ctor_no_start_no_end(self): with self.assertRaises(ValueError): self._make_one() @@ -119,6 +131,366 @@ def test_to_pb_w_start_open_and_end_closed(self): self.assertEqual(len(krange_pb.end_closed), 1) self.assertEqual(krange_pb.end_closed.values[0].string_value, KEY_2[0]) + def test_ctor_empty_list_start_closed_and_key_end_closed(self): + KEY_1 = [u'key_1'] + key_types = {'start_closed': [], 'end_closed': KEY_1} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_ctor_empty_list_start_closed_and_key_end_open(self): + KEY_1 = [u'key_1'] + key_types = {'start_closed': [], 'end_open': KEY_1} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_ctor_empty_list_end_closed_and_key_start_closed(self): + KEY_1 = [u'key_1'] + key_types = {'start_closed': KEY_1, 'end_closed': []} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_ctor_empty_list_end_open_and_key_start_closed(self): + KEY_1 = [u'key_1'] + key_types = {'start_closed': KEY_1, 'end_open': []} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_ctor_empty_list_start_open_and_key_end_closed(self): + KEY_1 = [u'key_1'] + key_types = {'start_open': [], 'end_closed': KEY_1} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_ctor_empty_list_start_open_and_key_end_open(self): + KEY_1 = [u'key_1'] + key_types = {'start_open': [], 'end_open': KEY_1} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_ctor_empty_list_end_closed_and_key_start_open(self): + KEY_1 = [u'key_1'] + key_types = {'start_open': KEY_1, 'end_closed': []} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_ctor_empty_list_end_open_and_key_start_open(self): + KEY_1 = [u'key_1'] + key_types = {'start_open': KEY_1, 'end_open': []} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_empty_list_start_closed_and_key_end_closed(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_closed': [], 'end_closed': KEY_1} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + if key_types[key_type] == KEY_1: + self.assertEqual(len(getattr(krange_pb, key_type)), 1) + key1 = getattr(krange_pb, key_type).values[0].string_value + self.assertEqual(key1, KEY_1[0]) + else: + self.assertEqual(len(getattr(krange_pb, key_type)), 0) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_empty_list_start_closed_and_key_end_open(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_closed': [], 'end_open': KEY_1} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + if key_types[key_type] == KEY_1: + self.assertEqual(len(getattr(krange_pb, key_type)), 1) + key1 = getattr(krange_pb, key_type).values[0].string_value + self.assertEqual(key1, KEY_1[0]) + else: + self.assertEqual(len(getattr(krange_pb, key_type)), 0) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_empty_list_end_closed_and_key_start_closed(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_closed': KEY_1, 'end_closed': []} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + if key_types[key_type] == KEY_1: + self.assertEqual(len(getattr(krange_pb, key_type)), 1) + key1 = getattr(krange_pb, key_type).values[0].string_value + self.assertEqual(key1, KEY_1[0]) + else: + self.assertEqual(len(getattr(krange_pb, key_type)), 0) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_empty_list_end_open_and_key_start_closed(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_closed': KEY_1, 'end_open': []} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + if key_types[key_type] == KEY_1: + self.assertEqual(len(getattr(krange_pb, key_type)), 1) + key1 = getattr(krange_pb, key_type).values[0].string_value + self.assertEqual(key1, KEY_1[0]) + else: + self.assertEqual(len(getattr(krange_pb, key_type)), 0) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_empty_list_start_open_and_key_end_closed(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_open': [], 'end_closed': KEY_1} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + if key_types[key_type] == KEY_1: + self.assertEqual(len(getattr(krange_pb, key_type)), 1) + key1 = getattr(krange_pb, key_type).values[0].string_value + self.assertEqual(key1, KEY_1[0]) + else: + self.assertEqual(len(getattr(krange_pb, key_type)), 0) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_empty_list_start_open_and_key_end_open(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_open': [], 'end_open': KEY_1} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + if key_types[key_type] == KEY_1: + self.assertEqual(len(getattr(krange_pb, key_type)), 1) + key1 = getattr(krange_pb, key_type).values[0].string_value + self.assertEqual(key1, KEY_1[0]) + else: + self.assertEqual(len(getattr(krange_pb, key_type)), 0) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_empty_list_end_closed_and_key_start_open(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_open': KEY_1, 'end_closed': []} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + if key_types[key_type] == KEY_1: + self.assertEqual(len(getattr(krange_pb, key_type)), 1) + key1 = getattr(krange_pb, key_type).values[0].string_value + self.assertEqual(key1, KEY_1[0]) + else: + self.assertEqual(len(getattr(krange_pb, key_type)), 0) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_empty_list_end_open_and_key_start_open(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_open': KEY_1, 'end_open': []} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + if key_types[key_type] == KEY_1: + self.assertEqual(len(getattr(krange_pb, key_type)), 1) + key1 = getattr(krange_pb, key_type).values[0].string_value + self.assertEqual(key1, KEY_1[0]) + else: + self.assertEqual(len(getattr(krange_pb, key_type)), 0) + self._check_unused_keys(krange, used_keys) + + def test_ctor_keys_start_closed_end_closed(self): + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_closed': KEY_1, 'end_closed': KEY_2} + krange = self._make_one(**key_types) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_ctor_keys_start_closed_end_open(self): + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_closed': KEY_1, 'end_open': KEY_2} + krange = self._make_one(**key_types) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_ctor_keys_start_open_end_closed(self): + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_open': KEY_1, 'end_closed': KEY_2} + krange = self._make_one(**key_types) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_ctor_keys_start_open_end_open(self): + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_open': KEY_1, 'end_open': KEY_2} + krange = self._make_one(**key_types) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + self.assertEqual(getattr(krange, key_type), key_types[key_type]) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_keys_start_closed_end_closed(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_closed': KEY_1, 'end_closed': KEY_2} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + pb_key_type = getattr(krange_pb, key_type) + self.assertEqual(len(pb_key_type), 1) + self.assertEqual(pb_key_type.values[0].string_value, + key_types[key_type][0]) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_keys_start_closed_end_open(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_closed': KEY_1, 'end_open': KEY_2} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + pb_key_type = getattr(krange_pb, key_type) + self.assertEqual(len(pb_key_type), 1) + self.assertEqual(pb_key_type.values[0].string_value, + key_types[key_type][0]) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_keys_start_open_end_closed(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_open': KEY_1, 'end_closed': KEY_2} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + pb_key_type = getattr(krange_pb, key_type) + self.assertEqual(len(pb_key_type), 1) + self.assertEqual(pb_key_type.values[0].string_value, + key_types[key_type][0]) + self._check_unused_keys(krange, used_keys) + + def test_to_pb_keys_start_open_end_open(self): + from google.cloud.spanner_v1.proto.keys_pb2 import KeyRange + KEY_1 = [u'key_1'] + KEY_2 = [u'key_2'] + key_types = {'start_open': KEY_1, 'end_open': KEY_2} + krange = self._make_one(**key_types) + krange_pb = krange.to_pb() + self.assertIsInstance(krange_pb, KeyRange) + used_keys = [] + for key_type in six.iterkeys(key_types): + used_keys.append(key_type) + pb_key_type = getattr(krange_pb, key_type) + self.assertEqual(len(pb_key_type), 1) + self.assertEqual(pb_key_type.values[0].string_value, + key_types[key_type][0]) + self._check_unused_keys(krange, used_keys) + class TestKeySet(unittest.TestCase):